Skip to content

Commit

Permalink
[#noissue] Cleanup deprecated Hbase API
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Nov 7, 2023
1 parent 30b9d56 commit 0858410
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Repository;

import java.util.Objects;
Expand Down Expand Up @@ -111,7 +111,7 @@ private Scan createScan(String agentId, long currentTime) {
scan.withStartRow(startKeyBytes);
scan.withStopRow(endKeyBytes);
scan.addFamily(DESCRIPTOR.getName());
scan.setMaxVersions(1);
scan.readVersions(1);
scan.setCaching(SCANNER_CACHING);

return scan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.List;
import java.util.Objects;

Expand All @@ -42,7 +43,7 @@ public AvgMaxLinkScheduler(@Qualifier("statisticsLinkScheduler") TaskScheduler s
@PostConstruct
public void linkScheduling() {
for (CachedStatisticsDao dao : statisticsDaoList) {
this.scheduler.scheduleWithFixedDelay(dao::flushAvgMax, 1000);
this.scheduler.scheduleWithFixedDelay(dao::flushAvgMax, Duration.ofMillis(1000));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.List;
import java.util.Objects;

Expand All @@ -42,7 +43,7 @@ public StatisticsLinkScheduler(@Qualifier("avgMaxLinkScheduler") TaskScheduler s
@PostConstruct
public void linkScheduling() {
for (CachedStatisticsDao dao : statisticsDaoList) {
this.scheduler.scheduleWithFixedDelay(dao::flushLink, 1000);
this.scheduler.scheduleWithFixedDelay(dao::flushLink, Duration.ofMillis(1000));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
package com.navercorp.pinpoint.common.hbase;

import com.navercorp.pinpoint.common.util.CollectionUtils;

import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -162,6 +163,16 @@ public void put(TableName tableName, List<Put> puts) {
delegate.put(tableName, puts);
}

@Override
public CheckAndMutateResult checkAndMutate(TableName tableName, CheckAndMutate checkAndMutate) {
return this.delegate.checkAndMutate(tableName, checkAndMutate);
}

@Override
public List<CheckAndMutateResult> checkAndMutate(TableName tableName, List<CheckAndMutate> checkAndMutates) {
return this.delegate.checkAndMutate(tableName, checkAndMutates);
}

/**
* Atomically checks if a row/family/qualifier value matches the expected
* value. If it does, it adds the put. If the passed value is null, the check
Expand All @@ -177,7 +188,7 @@ public void put(TableName tableName, List<Put> puts) {
* @return true if the new put was executed, false otherwise
*/
@Override
public boolean checkAndPut(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) {
public boolean checkAndPut(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, CompareOperator compareOp, byte[] value, Put put) {
return delegate.checkAndPut(tableName, rowName, familyName, qualifier, compareOp, value, put);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
package com.navercorp.pinpoint.common.hbase;

import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;

import java.util.List;

Expand Down Expand Up @@ -79,6 +81,10 @@ public interface HbaseOperations2 {
void put(TableName tableName, final Put put);
void put(TableName tableName, final List<Put> puts);

CheckAndMutateResult checkAndMutate(TableName tableName, CheckAndMutate checkAndMutate);

List<CheckAndMutateResult> checkAndMutate(TableName tableName, List<CheckAndMutate> checkAndMutates);

/**
* Atomically checks if a row/family/qualifier value matches the expected
* value. If it does, it adds the put. If the passed value is null, the check
Expand All @@ -93,7 +99,7 @@ public interface HbaseOperations2 {
* @param put data to put if check succeeds
* @return true if the new put was executed, false otherwise
*/
boolean checkAndPut(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put);
boolean checkAndPut(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, CompareOperator compareOp, byte[] value, Put put);

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import com.navercorp.pinpoint.common.profiler.concurrent.ExecutorFactory;
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;
import com.navercorp.pinpoint.common.util.StopWatch;

import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import com.sematext.hbase.wd.DistributedScanner;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
Expand All @@ -36,10 +38,9 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

Expand Down Expand Up @@ -80,6 +81,8 @@ public class HbaseTemplate2 extends HbaseAccessor implements HbaseOperations2, I

private HBaseAsyncOperation asyncOperation = DisabledHBaseAsyncOperation.INSTANCE;

private static final CheckAndMutateResult CHECK_AND_MUTATE_RESULT_FAILURE = new CheckAndMutateResult(false, null);

public HbaseTemplate2() {
}

Expand Down Expand Up @@ -311,45 +314,93 @@ public Object doInTable(Table table) throws Throwable {
});
}

@Override
public CheckAndMutateResult checkAndMutate(TableName tableName, CheckAndMutate checkAndMutate) {
return (CheckAndMutateResult) execute(tableName, new TableCallback() {
@Override
public CheckAndMutateResult doInTable(Table table) throws Throwable {
try {
return table.checkAndMutate(checkAndMutate);
} catch (IOException e) {
return CHECK_AND_MUTATE_RESULT_FAILURE;
}
}
});
}

@Override
public List<CheckAndMutateResult> checkAndMutate(TableName tableName, List<CheckAndMutate> checkAndMutates) {
return (List<CheckAndMutateResult>) execute(tableName, new TableCallback() {
@Override
public List<CheckAndMutateResult> doInTable(Table table) throws Throwable {
try {
return table.checkAndMutate(checkAndMutates);
} catch (IOException e) {
return List.of(CHECK_AND_MUTATE_RESULT_FAILURE, CHECK_AND_MUTATE_RESULT_FAILURE);
}
}
});
}

/**
* Atomically checks if a row/family/qualifier value matches the expected
* value. If it does, it adds the put. If the passed value is null, the check
* is for the lack of column (ie: non-existence)
*
* @param tableName target table
* @param rowName to check
* @param familyName column family to check
* @param qualifier column qualifier to check
* @param compareOp comparison operator to use
* @param value the expected value
* @param put data to put if check succeeds
* @param rowName to check
* @param familyName column family to check
* @param qualifier column qualifier to check
* @param compareOp comparison operator to use
* @param value the expected value
* @param put data to put if check succeeds
* @return true if the new put was executed, false otherwise
*/
@Override
public boolean checkAndPut(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) {
return (boolean) execute(tableName, new TableCallback() {
public boolean checkAndPut(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, CompareOperator compareOp, byte[] value, Put put) {

CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(rowName)
.ifMatches(familyName, qualifier, compareOp, value)
.build(put);

CheckAndMutateResult result = this.checkAndMutate(tableName, checkAndMutate);
return result.isSuccess();
}

@Override
public void maxColumnValue(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, long value) {
final byte[] valBytes = Bytes.toBytes(value);
Put put = new Put(rowName);
put.addColumn(familyName, qualifier, valBytes);

CheckAndMutate checkAndPut = CheckAndMutate.newBuilder(rowName)
.ifMatches(familyName, qualifier, CompareOperator.EQUAL, null)
.build(put);

this.execute(tableName, new TableCallback<Object>() {
@Override
public Object doInTable(Table table) throws Throwable {
try {
return table.checkAndPut(rowName, familyName, qualifier, compareOp, value, put);
} catch (IOException e) {
return Boolean.FALSE;
CheckAndMutateResult result = table.checkAndMutate(checkAndPut);
if (result.isSuccess()) {
logger.debug("MaxUpdate success for null");
return null;
}
CheckAndMutate checkAndMax = checkAndMax(rowName, familyName, qualifier, valBytes, put);
CheckAndMutateResult maxResult = table.checkAndMutate(checkAndMax);
if (maxResult.isSuccess()) {
logger.debug("MaxUpdate success for GREATER");
} else {
logger.trace("MaxUpdate failure for ConcurrentUpdate");
}
return null;
}
});
}

@Override
public void maxColumnValue(TableName tableName, byte[] rowName, byte[] familyName, byte[] qualifier, long value) {
byte[] valBytes = Bytes.toBytes(value);
Put put = new Put(rowName);
put.addColumn(familyName, qualifier, valBytes);
//check for existence and put for the first time
boolean success = checkAndPut(tableName, rowName, familyName, qualifier, CompareFilter.CompareOp.EQUAL, null, put);
if (!success) {
//check for max and put for update
checkAndPut(tableName, rowName, familyName, qualifier, CompareFilter.CompareOp.GREATER, valBytes, put);
}
private CheckAndMutate checkAndMax(byte[] rowName, byte[] familyName, byte[] qualifier, byte[] valBytes, Put put) {
return CheckAndMutate.newBuilder(rowName)
.ifMatches(familyName, qualifier, CompareOperator.GREATER, valBytes)
.build(put);
}

@Override
Expand Down Expand Up @@ -722,11 +773,11 @@ public long incrementColumnValue(TableName tableName, final byte[] rowName, fina
return execute(tableName, new TableCallback<Long>() {
@Override
public Long doInTable(Table table) throws Throwable {
return table.incrementColumnValue(rowName, familyName, qualifier, amount, writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
return table.incrementColumnValue(rowName, familyName, qualifier, amount, writeToWAL ? Durability.SKIP_WAL : Durability.USE_DEFAULT);
}
});
}

@Override
public <T> T execute(TableName tableName, TableCallback<T> action) {
Objects.requireNonNull(tableName, "tableName");
Expand All @@ -750,7 +801,7 @@ public <T> T execute(TableName tableName, TableCallback<T> action) {
releaseTable(table);
}
}

private void releaseTable(Table table) {
getTableFactory().releaseTable(table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,18 @@
import com.navercorp.pinpoint.common.server.bo.event.AgentEventBo;
import com.navercorp.pinpoint.common.server.bo.serializer.agent.AgentIdRowKeyEncoder;
import com.navercorp.pinpoint.common.server.util.AgentEventType;
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.web.dao.AgentEventDao;
import com.navercorp.pinpoint.common.server.util.time.Range;

import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

Expand Down Expand Up @@ -82,7 +81,7 @@ public List<AgentEventBo> getAgentEvents(String agentId, Range range, Set<AgentE
Objects.requireNonNull(range, "range");

Scan scan = new Scan();
scan.setMaxVersions(1);
scan.readVersions(1);
scan.setCaching(SCANNER_CACHE_SIZE);

scan.withStartRow(createRowKey(agentId, range.getTo()));
Expand All @@ -93,7 +92,7 @@ public List<AgentEventBo> getAgentEvents(String agentId, Range range, Set<AgentE
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
for (AgentEventType excludeEventType : excludeEventTypes) {
byte[] excludeQualifier = Bytes.toBytes(excludeEventType.getCode());
filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(excludeQualifier)));
filterList.addFilter(new QualifierFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(excludeQualifier)));
}
scan.setFilter(filterList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private Scan createScan(String agentId, long startTime, long endTime, AgentInfoC
scan.addColumn(family, DESCRIPTOR.QUALIFIER_JVM);
}

scan.setMaxVersions(1);
scan.readVersions(1);
scan.setCaching(SCANNER_CACHING);

return scan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,12 @@ private Scan createScan(String agentId, long fromTimestamp, long toTimestamp) {
byte[] startKeyBytes = agentIdEncoder.encodeRowKey(agentId, toTimestamp);
byte[] endKeyBytes = agentIdEncoder.encodeRowKey(agentId, fromTimestamp);

Scan scan = new Scan(startKeyBytes, endKeyBytes);
Scan scan = new Scan();
scan.withStartRow(startKeyBytes);
scan.withStopRow(endKeyBytes);

scan.addColumn(DESCRIPTOR.getName(), DESCRIPTOR.QUALIFIER_STATES);
scan.setMaxVersions(1);
scan.readVersions(1);
scan.setCaching(SCANNER_CACHING);

return scan;
Expand Down
Loading

0 comments on commit 0858410

Please sign in to comment.