Skip to content

Commit

Permalink
[pinpoint-apm#7559] Add Avg/Max stat
Browse files Browse the repository at this point in the history
  • Loading branch information
yjqg6666 authored and emeroad committed Jan 19, 2021
1 parent c12229e commit b39af45
Show file tree
Hide file tree
Showing 28 changed files with 655 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallRowKey;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ResponseColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowInfo;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.hbase.TableDescriptor;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.trace.HistogramSchema;
import com.navercorp.pinpoint.common.trace.ServiceType;
import com.navercorp.pinpoint.common.profiler.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.TimeSlot;
Expand Down Expand Up @@ -110,14 +112,23 @@ public void received(String applicationName, ServiceType applicationServiceType,

final short slotNumber = ApplicationMapStatisticsUtils.getSlotNumber(applicationServiceType, elapsed, isError);
final ColumnName selfColumnName = new ResponseColumnName(agentId, slotNumber);

HistogramSchema histogramSchema = applicationServiceType.getHistogramSchema();
final ColumnName sumColumnName = new ResponseColumnName(agentId, histogramSchema.getSumStatSlot().getSlotTime());
final ColumnName maxColumnName = new ResponseColumnName(agentId, histogramSchema.getMaxStatSlot().getSlotTime());

if (useBulk) {
TableName mapStatisticsSelfTableName = descriptor.getTableName();
bulkIncrementer.increment(mapStatisticsSelfTableName, selfRowKey, selfColumnName);
bulkIncrementer.increment(mapStatisticsSelfTableName, selfRowKey, sumColumnName, elapsed);
bulkIncrementer.updateMax(mapStatisticsSelfTableName, selfRowKey, maxColumnName, elapsed);
} else {
final byte[] rowKey = getDistributedKey(selfRowKey.getRowKey());
// column name is the name of caller app.
byte[] columnName = selfColumnName.getColumnName();
increment(rowKey, columnName, 1L);
increment(rowKey, sumColumnName.getColumnName(), elapsed);
checkAndMax(rowKey, maxColumnName.getColumnName(), elapsed);
}
}

Expand All @@ -129,6 +140,13 @@ private void increment(byte[] rowKey, byte[] columnName, long increment) {
hbaseTemplate.incrementColumnValue(mapStatisticsSelfTableName, rowKey, descriptor.getColumnFamilyName(), columnName, increment);
}

private void checkAndMax(byte[] rowKey, byte[] columnName, long val) {
Objects.requireNonNull(rowKey, "rowKey");
Objects.requireNonNull(columnName, "columnName");

hbaseTemplate.maxColumnValue(descriptor.getTableName(), rowKey, descriptor.getColumnFamilyName(), columnName, val);
}


@Override
public void flushAll() {
Expand All @@ -145,6 +163,13 @@ public void flushAll() {
}
hbaseTemplate.increment(tableName, increments);
}

Map<RowInfo, Long> maxUpdateMap = bulkIncrementer.getMaxUpdate();
for (RowInfo rowInfo : maxUpdateMap.keySet()) {
Long val = maxUpdateMap.get(rowInfo);
final byte[] rowKey = getDistributedKey(rowInfo.getRowKey().getRowKey());
checkAndMax(rowKey, rowInfo.getColumnName().getColumnName(), val);
}
}

private byte[] getDistributedKey(byte[] rowKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallRowKey;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallerColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowInfo;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.hbase.TableDescriptor;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.trace.HistogramSchema;
import com.navercorp.pinpoint.common.trace.ServiceType;
import com.navercorp.pinpoint.common.profiler.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.TimeSlot;
Expand Down Expand Up @@ -115,15 +117,23 @@ public void update(String calleeApplicationName, ServiceType calleeServiceType,
final short callerSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(calleeServiceType, elapsed, isError);
final ColumnName callerColumnName = new CallerColumnName(callerServiceType.getCode(), callerApplicationName, callerHost, callerSlotNumber);

HistogramSchema histogramSchema = calleeServiceType.getHistogramSchema();
final ColumnName sumColumnName = new CallerColumnName(callerServiceType.getCode(), callerApplicationName, callerHost, histogramSchema.getSumStatSlot().getSlotTime());
final ColumnName maxColumnName = new CallerColumnName(callerServiceType.getCode(), callerApplicationName, callerHost, histogramSchema.getMaxStatSlot().getSlotTime());

if (useBulk) {
TableName mapStatisticsCallerTableName = descriptor.getTableName();
bulkIncrementer.increment(mapStatisticsCallerTableName, calleeRowKey, callerColumnName);
TableName tableName = descriptor.getTableName();
bulkIncrementer.increment(tableName, calleeRowKey, callerColumnName);
bulkIncrementer.increment(tableName, calleeRowKey, sumColumnName, elapsed);
bulkIncrementer.updateMax(tableName, calleeRowKey, maxColumnName, elapsed);
} else {
final byte[] rowKey = getDistributedKey(calleeRowKey.getRowKey());

// column name is the name of caller app.
byte[] columnName = callerColumnName.getColumnName();
increment(rowKey, columnName, 1L);
increment(rowKey, sumColumnName.getColumnName(), elapsed);
checkAndMax(rowKey, maxColumnName.getColumnName(), elapsed);
}
}

Expand All @@ -135,6 +145,13 @@ private void increment(byte[] rowKey, byte[] columnName, long increment) {
hbaseTemplate.incrementColumnValue(mapStatisticsCallerTableName, rowKey, descriptor.getColumnFamilyName(), columnName, increment);
}

private void checkAndMax(byte[] rowKey, byte[] columnName, long value) {
Objects.requireNonNull(rowKey, "rowKey");
Objects.requireNonNull(columnName, "columnName");

hbaseTemplate.maxColumnValue(descriptor.getTableName(), rowKey, descriptor.getColumnFamilyName(), columnName, value);
}

@Override
public void flushAll() {
if (!useBulk) {
Expand All @@ -152,6 +169,12 @@ public void flushAll() {
hbaseTemplate.increment(tableName, increments);
}

Map<RowInfo, Long> maxUpdateMap = bulkIncrementer.getMaxUpdate();
for (RowInfo rowInfo : maxUpdateMap.keySet()) {
Long val = maxUpdateMap.get(rowInfo);
final byte[] rowKey = getDistributedKey(rowInfo.getRowKey().getRowKey());
checkAndMax(rowKey, rowInfo.getColumnName().getColumnName(), val);
}
}

private byte[] getDistributedKey(byte[] rowKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallRowKey;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.CalleeColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowInfo;
import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.hbase.TableDescriptor;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.trace.HistogramSchema;
import com.navercorp.pinpoint.common.trace.ServiceType;
import com.navercorp.pinpoint.common.profiler.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.server.util.TimeSlot;
Expand Down Expand Up @@ -117,14 +119,22 @@ public void update(String callerApplicationName, ServiceType callerServiceType,

final short calleeSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(calleeServiceType, elapsed, isError);
final ColumnName calleeColumnName = new CalleeColumnName(callerAgentid, calleeServiceType.getCode(), calleeApplicationName, calleeHost, calleeSlotNumber);

HistogramSchema histogramSchema = callerServiceType.getHistogramSchema();
final ColumnName sumColumnName = new CalleeColumnName(callerAgentid, calleeServiceType.getCode(), calleeApplicationName, calleeHost, histogramSchema.getSumStatSlot().getSlotTime());
final ColumnName maxColumnName = new CalleeColumnName(callerAgentid, calleeServiceType.getCode(), calleeApplicationName, calleeHost, histogramSchema.getMaxStatSlot().getSlotTime());
if (useBulk) {
TableName mapStatisticsCalleeTableName = descriptor.getTableName();
bulkIncrementer.increment(mapStatisticsCalleeTableName, callerRowKey, calleeColumnName);
TableName tableName = descriptor.getTableName();
bulkIncrementer.increment(tableName, callerRowKey, calleeColumnName);
bulkIncrementer.increment(tableName, callerRowKey, sumColumnName, elapsed);
bulkIncrementer.updateMax(tableName, callerRowKey, maxColumnName, elapsed);
} else {
final byte[] rowKey = getDistributedKey(callerRowKey.getRowKey());
// column name is the name of caller app.
byte[] columnName = calleeColumnName.getColumnName();
increment(rowKey, columnName, 1L);
increment(rowKey, sumColumnName.getColumnName(), elapsed);
checkAndMax(rowKey, maxColumnName.getColumnName(), elapsed);
}
}

Expand All @@ -136,6 +146,13 @@ private void increment(byte[] rowKey, byte[] columnName, long increment) {
hbaseTemplate.incrementColumnValue(mapStatisticsCalleeTableName, rowKey, descriptor.getColumnFamilyName(), columnName, increment);
}

private void checkAndMax(byte[] rowKey, byte[] columnName, long value) {
Objects.requireNonNull(rowKey, "rowKey");
Objects.requireNonNull(columnName, "columnName");

hbaseTemplate.maxColumnValue(descriptor.getTableName(), rowKey, descriptor.getColumnFamilyName(), columnName, value);
}

@Override
public void flushAll() {
if (!useBulk) {
Expand All @@ -152,6 +169,13 @@ public void flushAll() {
}
hbaseTemplate.increment(tableName, increments);
}

Map<RowInfo, Long> maxUpdateMap = bulkIncrementer.getMaxUpdate();
for (RowInfo rowInfo : maxUpdateMap.keySet()) {
Long val = maxUpdateMap.get(rowInfo);
final byte[] rowKey = getDistributedKey(rowInfo.getRowKey().getRowKey());
checkAndMax(rowKey, rowInfo.getColumnName().getColumnName(), val);
}
}

private byte[] getDistributedKey(byte[] rowKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,32 @@ public class BulkIncrementer {

private final AtomicLongMap<RowInfo> counter = AtomicLongMap.create();

private final AtomicLongMap<RowInfo> max = AtomicLongMap.create();

public BulkIncrementer(RowKeyMerge rowKeyMerge) {
this.rowKeyMerge = Objects.requireNonNull(rowKeyMerge, "rowKeyMerge");
}

public void increment(TableName tableName, RowKey rowKey, ColumnName columnName) {
increment(tableName, rowKey, columnName, 1L);
}

public void increment(TableName tableName, RowKey rowKey, ColumnName columnName, long addition) {
RowInfo rowInfo = new DefaultRowInfo(tableName, rowKey, columnName);
counter.addAndGet(rowInfo, addition);
}

public void updateMax(TableName tableName, RowKey rowKey, ColumnName columnName, long value) {
RowInfo rowInfo = new DefaultRowInfo(tableName, rowKey, columnName);
counter.incrementAndGet(rowInfo);
max.accumulateAndGet(rowInfo, value, Long::max);
}

public Map<TableName, List<Increment>> getIncrements(RowKeyDistributorByHashPrefix rowKeyDistributor) {
final Map<RowInfo, Long> snapshot = AtomicLongMapUtils.remove(counter);
return rowKeyMerge.createBulkIncrement(snapshot, rowKeyDistributor);
}

public Map<RowInfo, Long> getMaxUpdate() {
return AtomicLongMapUtils.remove(max);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,53 @@ public void setUp() {
when(rowKeyDistributor.getDistributedKey(any(byte[].class))).then(invocation -> invocation.getArgument(0));
}

@Test
public void maxUpdate() {
// Given
TableName tableA = TableName.valueOf("A");
RowKey rowKey= new TestRowKey(0);
ColumnName columnName = new TestColumnName(1);


bulkIncrementer.updateMax(tableA, rowKey, columnName, 10);
Map<RowInfo, Long> maxUpdate = bulkIncrementer.getMaxUpdate();
long max = maxUpdate.get(new DefaultRowInfo(tableA, rowKey, columnName));

Assert.assertEquals("max", 10L, max);
}

@Test
public void maxUpdate_max() {
// Given
TableName tableA = TableName.valueOf("A");
RowKey rowKey= new TestRowKey(0);
ColumnName columnName = new TestColumnName(1);


bulkIncrementer.updateMax(tableA, rowKey, columnName, 10);
bulkIncrementer.updateMax(tableA, rowKey, columnName, 20);
Map<RowInfo, Long> maxUpdate = bulkIncrementer.getMaxUpdate();
long max = maxUpdate.get(new DefaultRowInfo(tableA, rowKey, columnName));

Assert.assertEquals("max", 20L, max);
}

@Test
public void maxUpdate_fail() {
// Given
TableName tableA = TableName.valueOf("A");
RowKey rowKey= new TestRowKey(0);
ColumnName columnName = new TestColumnName(1);


bulkIncrementer.updateMax(tableA, rowKey, columnName, 10);
bulkIncrementer.updateMax(tableA, rowKey, columnName, 0);
Map<RowInfo, Long> maxUpdate = bulkIncrementer.getMaxUpdate();
long max = maxUpdate.get(new DefaultRowInfo(tableA, rowKey, columnName));

Assert.assertEquals("max", 10L, max);
}

@Test
public void singleTable() {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,27 @@ public void testIncrement() throws Exception {
}

@Test
public void testIntegerMax() throws Exception {
public void updateMax_update() throws Exception {
AtomicLongMap<String> cache = AtomicLongMap.create();
cache.addAndGet("a", 1L);
cache.addAndGet("a", 2L);
cache.addAndGet("b", 5L);

final String key = "a";

cache.put(key, 10);
long updated = cache.accumulateAndGet(key, 12, Long::max);

Assert.assertEquals(12, updated);
}

@Test
public void testIntegerMin() throws Exception {
public void updateMax_fail() throws Exception {
AtomicLongMap<String> cache = AtomicLongMap.create();
cache.addAndGet("a", 1L);
cache.addAndGet("a", 2L);
cache.addAndGet("b", 5L);

final String key = "a";

cache.put(key, 10);
long updated = cache.accumulateAndGet(key, 9, Long::max);

Assert.assertEquals(10, updated);
}

// @Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -160,6 +161,30 @@ public void put(TableName tableName, List<Put> puts) {
delegate.put(tableName, puts);
}

/**
* Atomically checks if a row/family/qualifier value matches the expected
* value. If it does, it adds the put. If the passed value is null, the check
* is for the lack of column (ie: non-existance)
*
* @param tableName target table
* @param rowName to check
* @param familyName column family to check
* @param qualifier column qualifier to check
* @param compareOp comparison operator to use
* @param value the expected value
* @param put data to put if check succeeds
* @return true if the new put was executed, false otherwise
*/
@Override
public boolean checkAndPut(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) {
return delegate.checkAndPut(tableName, rowName, familyName, qualifier, compareOp, value, put);
}

@Override
public void maxColumnValue(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, long value) {
delegate.maxColumnValue(tableName, rowName, familyName, qualifier, value);
}

@Override
public void delete(TableName tableName, Delete delete) {
delegate.delete(tableName, delete);
Expand Down
Loading

0 comments on commit b39af45

Please sign in to comment.