Skip to content

Commit

Permalink
[pinpoint-apm#11411] Improve write performance of ServerMap Link
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 23, 2024
1 parent c7d1fb1 commit 0180c2b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.navercorp.pinpoint.collector.dao.hbase.statistics;

import com.navercorp.pinpoint.common.hbase.CasResult;
import com.navercorp.pinpoint.common.hbase.CheckAndMax;
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.TableNameProvider;
Expand All @@ -12,10 +11,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
* @author emeroad
Expand Down Expand Up @@ -93,12 +93,6 @@ public void flushLink() {

}

private CompletableFuture<CasResult> checkAndMax(TableName tableName, CheckAndMax checkAndMax) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(checkAndMax, "checkAndMax");
return this.asyncTemplate.maxColumnValue(tableName, checkAndMax);
}

@Override
public void flushAvgMax() {

Expand All @@ -110,13 +104,25 @@ public void flushAvgMax() {
}
}

Map<TableName, List<CheckAndMax>> maxUpdates = new HashMap<>();
for (Map.Entry<RowInfo, Long> entry : maxUpdateMap.entrySet()) {
final RowInfo rowInfo = entry.getKey();
final Long val = entry.getValue();
final byte[] rowKey = getDistributedKey(rowInfo.getRowKey().getRowKey());
byte[] columnName = rowInfo.getColumnName().getColumnName();
CheckAndMax checkAndMax = new CheckAndMax(rowKey, getColumnFamilyName(), columnName, val);
checkAndMax(rowInfo.getTableName(), checkAndMax);

List<CheckAndMax> checkAndMaxes = maxUpdates.computeIfAbsent(rowInfo.getTableName(), k -> new ArrayList<>());
checkAndMaxes.add(checkAndMax);
}

for (Map.Entry<TableName, List<CheckAndMax>> entry : maxUpdates.entrySet()) {
TableName tableName = entry.getKey();
List<CheckAndMax> maxs = entry.getValue();
List<List<CheckAndMax>> partition = ListUtils.partition(maxs, batchSize);
for (List<CheckAndMax> checkAndMaxes : partition) {
this.asyncTemplate.maxColumnValue(tableName, checkAndMaxes);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package com.navercorp.pinpoint.collector.dao.hbase.statistics;

import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;

import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -64,6 +63,8 @@ public Map<TableName, List<Increment>> createBulkIncrement(Map<RowInfo, Long> da
for (Map.Entry<RowKey, List<ColumnName>> rowKeyEntry : tableRowKeys.getValue().entrySet()) {
Increment increment = createIncrement(rowKeyEntry, rowKeyDistributorByHashPrefix);
incrementList.add(increment);


}
tableIncrementMap.put(tableName, incrementList);
}
Expand All @@ -74,9 +75,12 @@ private Increment createIncrement(Map.Entry<RowKey, List<ColumnName>> rowKeyEntr
RowKey rowKey = rowKeyEntry.getKey();
byte[] key = getRowKey(rowKey, rowKeyDistributorByHashPrefix);
final Increment increment = new Increment(key);
increment.setReturnResults(false);
for (ColumnName columnName : rowKeyEntry.getValue()) {
increment.addColumn(family, columnName.getColumnName(), columnName.getCallCount());
}


logger.trace("create increment row:{}, column:{}", rowKey, rowKeyEntry.getValue());
return increment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public void increment(RowKey rowKey, ColumnName columnName, long addition) {

TableName tableName = tableNameProvider.getTableName(this.tableDescriptor.getTable());
final byte[] rowKeyBytes = getDistributedKey(rowKey.getRowKey());

Increment increment = Increments.increment(rowKeyBytes, getColumnFamilyName(), columnName.getColumnName(), 1);
increment.setReturnResults(false);

this.hbaseTemplate.increment(tableName, increment);
}

Expand Down

0 comments on commit 0180c2b

Please sign in to comment.