Skip to content

Commit

Permalink
[pinpoint-apm#11379] Add AsyncPollingPutWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 21, 2024
1 parent 68d5133 commit b5ff6c5
Show file tree
Hide file tree
Showing 12 changed files with 738 additions and 3 deletions.
24 changes: 23 additions & 1 deletion collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,33 @@ hbase.client.properties.hbase.client.retries.number=4
hbase.client.put-writer.concurrency-limit=100000
hbase.client.span-put-writer.concurrency-limit=0

# asyncBufferedMutator, asyncTable
# asyncBufferedMutator, asyncTable, asyncPoller
hbase.client.put-writer=asyncBufferedMutator

hbase.client.put-writer.async-buffered-mutator.writeBufferSize=100
hbase.client.put-writer.async-buffered-mutator.writeBufferPeriodicFlush=100

## asyncPoller
# parallelism=0 : auto detect cpu core
hbase.client.put-writer.async-poller.span.parallelism=0
# 1: all core, n: 1/n core (only work if parallelism=0)
hbase.client.put-writer.async-poller.span.cpuRatio=1
# Minimum CPU core (only work if parallelism=0)
hbase.client.put-writer.async-poller.span.minCpuCore=4

hbase.client.put-writer.async-poller.span.queueSize=10000
hbase.client.put-writer.async-poller.span.writeBufferSize=100
hbase.client.put-writer.async-poller.span.writeBufferPeriodicFlush=100

# parallelism=0 : auto detect cpu core
hbase.client.put-writer.async-poller.default.parallelism=0
# n: 1/n core, 1: all core, 4: 1/4 core (only work if parallelism=0)
hbase.client.put-writer.async-poller.default.cpuRatio=4
# Minimum CPU core (only work if parallelism=0)
hbase.client.put-writer.async-poller.default.minCpuCore=2
hbase.client.put-writer.async-poller.default.queueSize=5000
hbase.client.put-writer.async-poller.default.writeBufferSize=100
hbase.client.put-writer.async-poller.default.writeBufferPeriodicFlush=100

# hbase async =================================================================
# enable hbase async operation. default: false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.navercorp.pinpoint.common.hbase.async;

import com.navercorp.pinpoint.common.util.CpuUtils;

public class AsyncPollerOption {

private int queueSize = 1000 * 100;

private int writeBufferSize = 100;
private int writeBufferPeriodicFlush = 100;
private int parallelism = 0;
private int cpuRatio = 1;
private int minCpuCore = 2;


public int getQueueSize() {
return queueSize;
}

public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}

public int getWriteBufferSize() {
return writeBufferSize;
}

public void setWriteBufferSize(int writeBufferSize) {
this.writeBufferSize = writeBufferSize;
}

public int getWriteBufferPeriodicFlush() {
return writeBufferPeriodicFlush;
}

public void setWriteBufferPeriodicFlush(int writeBufferPeriodicFlush) {
this.writeBufferPeriodicFlush = writeBufferPeriodicFlush;
}

public int getParallelism() {
return cpu(parallelism, cpuRatio, minCpuCore);
}

int cpu(int parallelism, int cpuRatio, int minCpu) {
if (parallelism <= 0) {
final int cpuCount = getCpuCount();
int cpu = Math.floorDiv(cpuCount, cpuRatio);
return Math.max(cpu, minCpu);
}
return parallelism;
}

int getCpuCount() {
return CpuUtils.cpuCount();
}

public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}

public int getCpuRatio() {
return cpuRatio;
}

public void setCpuRatio(int cpuRatio) {
this.cpuRatio = cpuRatio;
}

public int getMinCpuCore() {
return minCpuCore;
}

public void setMinCpuCore(int minCpuCore) {
this.minCpuCore = minCpuCore;
}

@Override
public String toString() {
return "AsyncPollerOption{" +
"queueSize=" + queueSize +
", writeBufferSize=" + writeBufferSize +
", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush +
", parallelism=" + parallelism +
", cpuRatio=" + cpuRatio +
", minCpuCore=" + minCpuCore +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package com.navercorp.pinpoint.common.hbase.async;

import com.navercorp.pinpoint.common.hbase.RequestNotPermittedException;
import com.navercorp.pinpoint.common.hbase.util.FutureUtils;
import com.navercorp.pinpoint.common.profiler.logging.ThrottledLogger;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class AsyncPollerThread implements Closeable {

private final Logger logger = LogManager.getLogger(this.getClass());
private final ThrottledLogger tLogger = ThrottledLogger.getLogger(logger, 100);

private final TableWriterFactory writerFactory;

private final BlockingQueue<WriteRequest<Void>> queue;
private final int queueSize;
private final int writeBufferSize;
private final int writeBufferPeriodicFlush;
private final int pollTimeout;

private final Thread thread;
private final AtomicBoolean runState = new AtomicBoolean(true);

public static final RequestNotPermittedException OVERFLOW = new RequestNotPermittedException("write queue is full", false);

public AsyncPollerThread(String id, TableWriterFactory writerFactory,
AsyncPollerOption option) {
this.writerFactory = Objects.requireNonNull(writerFactory, "writerFactory");

this.queueSize = option.getQueueSize();
this.queue = new ArrayBlockingQueue<>(queueSize);

this.writeBufferSize = option.getWriteBufferSize();
this.writeBufferPeriodicFlush = option.getWriteBufferPeriodicFlush();
this.pollTimeout = Math.max(writeBufferPeriodicFlush / 4, 20);

this.thread = new Thread(this::dispatch, id);
this.thread.setDaemon(true);
this.thread.start();
}

public List<CompletableFuture<Void>> write(TableName tableName, List<Put> puts) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(puts, "puts");
if (isShutdown()) {
return FutureUtils.newFutureList(() -> CompletableFuture.failedFuture(new IllegalStateException("closed")), puts.size());
}

WriteRequest<Void> writeRequest = new WriteRequest<>(tableName, puts);
if (this.queue.offer(writeRequest)) {
return writeRequest.getFutures();
}
tLogger.info("write queue overflow");
return FutureUtils.newFutureList(() -> CompletableFuture.failedFuture(OVERFLOW), puts.size());
}


public void dispatch() {
while (isRun()) {
try {
List<WriteRequest<Void>> requests = poll();
if (requests.isEmpty()) {
continue;
}

Map<TableName, List<WriteRequest<Void>>> map = tableGroup(requests);

for (Map.Entry<TableName, List<WriteRequest<Void>>> entry : map.entrySet()) {
TableName tableName = entry.getKey();
List<WriteRequest<Void>> writes = entry.getValue();

if (logger.isDebugEnabled()) {
logger.debug("write {} {} requests:{}", this.thread.getName(), tableName, writes.size());
}
List<Put> puts = getPuts(writes);

AsyncTableWriterFactory.Writer writer = this.writerFactory.newWriter(tableName);
List<CompletableFuture<Void>> hbaseResults = writer.batch(puts);
addListeners(hbaseResults, writes);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.debug("Thread.interrupted {}", this.thread.getName());
if (isShutdown()) {
break;
}
} catch (Throwable th) {
logger.warn("Error {}", this.thread.getName(), th);
if (isShutdown()) {
break;
}
}
}
logger.info("dispatch terminated {}", this.thread.getName());
}

private boolean isRun() {
return runState.get();
}

private boolean isShutdown() {
return !runState.get();
}

private Map<TableName, List<WriteRequest<Void>>> tableGroup(List<WriteRequest<Void>> requests) {
Map<TableName, List<WriteRequest<Void>>> map = new HashMap<>();
for (WriteRequest<Void> req : requests) {
TableName tableName = req.getTableName();
List<WriteRequest<Void>> puts = map.computeIfAbsent(tableName, (key) -> new ArrayList<>());
puts.add(req);
}
return map;
}

@Override
public void close() {
logger.debug("Close {}", this.thread.getName());
this.runState.set(false);
this.thread.interrupt();
try {
this.thread.join(3000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}



private List<Put> getPuts(List<WriteRequest<Void>> writes) {
List<Put> puts = new ArrayList<>(writes.size());
for (WriteRequest<Void> write : writes) {
puts.addAll(write.getPuts());
}
return puts;
}

private static <V> void addListeners(List<CompletableFuture<V>> hbaseResults, List<WriteRequest<V>> requests) {
int i = 0;
for (WriteRequest<V> writeRequest : requests) {
for (CompletableFuture<V> write : writeRequest.getFutures()) {
CompletableFuture<V> hbaseFuture = hbaseResults.get(i++);
FutureUtils.addListener(hbaseFuture, write);
}
}
}

private List<WriteRequest<Void>> poll() throws InterruptedException {
final long startTime = System.currentTimeMillis();

List<WriteRequest<Void>> drain = new ArrayList<>(writeBufferSize);
int drainSize = 0;
while (isRun()) {
WriteRequest<Void> request = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
if (request != null) {
drain.add(request);
drainSize += request.getPuts().size();
if (bufferOverflow(drainSize)) {
return drain;
}
}
if (drainSize > 0) {
if (timeout(startTime)) {
return drain;
}
}
}
return drain;
}

private boolean timeout(long startTime) {
return System.currentTimeMillis() - startTime > writeBufferPeriodicFlush;
}

private boolean bufferOverflow(int drainSize) {
return drainSize >= writeBufferSize;
}

@Override
public String toString() {
return "AsyncPollerThread{" +
", queueSize=" + queueSize +
", writeBufferSize=" + writeBufferSize +
", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush +
", pollTimeout=" + pollTimeout +
", thread=" + thread +
'}';
}
}
Loading

0 comments on commit b5ff6c5

Please sign in to comment.