Skip to content

Commit

Permalink
[pinpoint-apm#11379] Add AsyncPollingPutWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 21, 2024
1 parent 68d5133 commit 63599b5
Show file tree
Hide file tree
Showing 11 changed files with 662 additions and 3 deletions.
24 changes: 23 additions & 1 deletion collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,33 @@ hbase.client.properties.hbase.client.retries.number=4
hbase.client.put-writer.concurrency-limit=100000
hbase.client.span-put-writer.concurrency-limit=0

# asyncBufferedMutator, asyncTable
# asyncBufferedMutator, asyncTable, asyncPoller
hbase.client.put-writer=asyncBufferedMutator

hbase.client.put-writer.async-buffered-mutator.writeBufferSize=100
hbase.client.put-writer.async-buffered-mutator.writeBufferPeriodicFlush=100

## asyncPoller
# parallelism=0 : auto detect cpu core
hbase.client.put-writer.async-poller.span.parallelism=0
# 1: all core, n: 1/n core (only work if parallelism=0)
hbase.client.put-writer.async-poller.span.cpuRatio=1
# Minimum CPU core (only work if parallelism=0)
hbase.client.put-writer.async-poller.span.minCpuCore=4

hbase.client.put-writer.async-poller.span.queueSize=10000
hbase.client.put-writer.async-poller.span.writeBufferSize=100
hbase.client.put-writer.async-poller.span.writeBufferPeriodicFlush=100

# parallelism=0 : auto detect cpu core
hbase.client.put-writer.async-poller.default.parallelism=0
# n: 1/n core, 1: all core, 4: 1/4 core (only work if parallelism=0)
hbase.client.put-writer.async-poller.default.cpuRatio=4
# Minimum CPU core (only work if parallelism=0)
hbase.client.put-writer.async-poller.default.minCpuCore=2
hbase.client.put-writer.async-poller.default.queueSize=5000
hbase.client.put-writer.async-poller.default.writeBufferSize=100
hbase.client.put-writer.async-poller.default.writeBufferPeriodicFlush=100

# hbase async =================================================================
# enable hbase async operation. default: false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.navercorp.pinpoint.common.hbase.async;

import com.navercorp.pinpoint.common.util.CpuUtils;

public class AsyncPollerOption {

private int queueSize = 1000 * 100;

private int writeBufferSize = 100;
private int writeBufferPeriodicFlush = 100;
private int parallelism = 0;
private int cpuRatio = 1;
private int minCpuCore = 2;


public int getQueueSize() {
return queueSize;
}

public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}

public int getWriteBufferSize() {
return writeBufferSize;
}

public void setWriteBufferSize(int writeBufferSize) {
this.writeBufferSize = writeBufferSize;
}

public int getWriteBufferPeriodicFlush() {
return writeBufferPeriodicFlush;
}

public void setWriteBufferPeriodicFlush(int writeBufferPeriodicFlush) {
this.writeBufferPeriodicFlush = writeBufferPeriodicFlush;
}

public int getParallelism() {
return cpu(parallelism, cpuRatio, minCpuCore);
}

static int cpu(int parallelism, int cpuRatio, int minCpu) {
if (parallelism <= 0) {
final int cpuCount = CpuUtils.cpuCount();
int cpu = Math.floorDiv(cpuCount, cpuRatio);
return Math.max(cpu, minCpu);
}
return parallelism;
}

public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}

public int getCpuRatio() {
return cpuRatio;
}

public void setCpuRatio(int cpuRatio) {
this.cpuRatio = cpuRatio;
}

public int getMinCpuCore() {
return minCpuCore;
}

public void setMinCpuCore(int minCpuCore) {
this.minCpuCore = minCpuCore;
}

@Override
public String toString() {
return "AsyncPollerOption{" +
"queueSize=" + queueSize +
", writeBufferSize=" + writeBufferSize +
", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush +
", parallelism=" + parallelism +
", cpuRatio=" + cpuRatio +
", minCpuCore=" + minCpuCore +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package com.navercorp.pinpoint.common.hbase.async;

import com.navercorp.pinpoint.common.hbase.RequestNotPermittedException;
import com.navercorp.pinpoint.common.hbase.util.FutureUtils;
import com.navercorp.pinpoint.common.profiler.logging.ThrottledLogger;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class AsyncPollerThread implements Closeable {

private final Logger logger = LogManager.getLogger(this.getClass());
private final ThrottledLogger tLogger = ThrottledLogger.getLogger(logger, 100);

private final TableWriterFactory writerFactory;

private final BlockingQueue<WriteRequest<Void>> queue;
private final int queueSize;
private final int writeBufferSize;
private final int writeBufferPeriodicFlush;
private final int pollTimeout;

private final Thread thread;

public static final RequestNotPermittedException OVERFLOW = new RequestNotPermittedException("write queue is full", false);

public AsyncPollerThread(String id, TableWriterFactory writerFactory,
AsyncPollerOption option) {
this.writerFactory = Objects.requireNonNull(writerFactory, "writerFactory");

this.queueSize = option.getQueueSize();
this.queue = new ArrayBlockingQueue<>(queueSize);

this.writeBufferSize = option.getWriteBufferSize();
this.writeBufferPeriodicFlush = option.getWriteBufferPeriodicFlush();
this.pollTimeout = Math.max(writeBufferPeriodicFlush / 4, 20);

this.thread = new Thread(this::dispatch, id);
this.thread.setDaemon(true);
this.thread.start();
}

public List<CompletableFuture<Void>> write(TableName tableName, List<Put> puts) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(puts, "puts");

WriteRequest<Void> writeRequest = new WriteRequest<>(tableName, puts);
if (this.queue.offer(writeRequest)) {
return writeRequest.getFutures();
}
tLogger.info("write queue overflow");
return FutureUtils.newFutureList(() -> CompletableFuture.failedFuture(OVERFLOW), puts.size());
}


public void dispatch() {
final Thread thread = this.thread;
while (!thread.isInterrupted()) {
try {
List<WriteRequest<Void>> requests = poll();

Map<TableName, List<WriteRequest<Void>>> map = tableGroup(requests);

for (Map.Entry<TableName, List<WriteRequest<Void>>> entry : map.entrySet()) {
TableName tableName = entry.getKey();
List<WriteRequest<Void>> writes = entry.getValue();

logger.info("write {} {} {}", thread.getName(), tableName, writes.size());

List<Put> puts = getPuts(writes);

AsyncTableWriterFactory.Writer writer = this.writerFactory.newWriter(tableName);
List<CompletableFuture<Void>> hbaseResults = writer.batch(puts);
addListeners(hbaseResults, writes);
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
break;
}
}
}


private Map<TableName, List<WriteRequest<Void>>> tableGroup(List<WriteRequest<Void>> requests) {
Map<TableName, List<WriteRequest<Void>>> map = new HashMap<>();
for (WriteRequest<Void> req : requests) {
TableName tableName = req.getTableName();
List<WriteRequest<Void>> puts = map.computeIfAbsent(tableName, (key) -> new ArrayList<>());
puts.add(req);
}
return map;
}

@Override
public void close() {
this.thread.interrupt();
try {
this.thread.join(3000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}



private List<Put> getPuts(List<WriteRequest<Void>> writes) {
List<Put> puts = new ArrayList<>(writes.size());
for (WriteRequest<Void> write : writes) {
puts.addAll(write.getPuts());
}
return puts;
}

private static <V> void addListeners(List<CompletableFuture<V>> hbaseResults, List<WriteRequest<V>> requests) {
int i = 0;
for (WriteRequest<V> writeRequest : requests) {
for (CompletableFuture<V> write : writeRequest.getFutures()) {
CompletableFuture<V> hbaseFuture = hbaseResults.get(i++);
FutureUtils.addListener(hbaseFuture, write);
}
}
}

private List<WriteRequest<Void>> poll() throws InterruptedException {
final long startTime = System.currentTimeMillis();

List<WriteRequest<Void>> drain = new ArrayList<>(writeBufferSize);
int drainSize = 0;
while (true) {
WriteRequest<Void> request = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
if (request != null) {
drain.add(request);
drainSize += request.getPuts().size();
if (bufferOverflow(drainSize)) {
return drain;
}
}
if (drainSize > 0) {
if (timeout(startTime)) {
return drain;
}
}
}
}

private boolean timeout(long startTime) {
return System.currentTimeMillis() - startTime > writeBufferPeriodicFlush;
}

private boolean bufferOverflow(int drainSize) {
return drainSize >= writeBufferSize;
}

@Override
public String toString() {
return "AsyncPollerThread{" +
", queueSize=" + queueSize +
", writeBufferSize=" + writeBufferSize +
", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush +
", pollTimeout=" + pollTimeout +
", thread=" + thread +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Put;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class AsyncPollingPutWriter implements HbasePutWriter, Closeable {

private final AsyncPollerThread[] pollers;


public AsyncPollingPutWriter(String name, AsyncConnection connection, AsyncPollerOption option) {
Objects.requireNonNull(option, "option");

TableWriterFactory factory = new AsyncTableWriterFactory(connection);
this.pollers = newAsyncWriteExecutors(name, factory, option);
}

@SuppressWarnings("resource")
private AsyncPollerThread[] newAsyncWriteExecutors(String name, TableWriterFactory writerFactory, AsyncPollerOption option) {
final AsyncPollerThread[] pollers = new AsyncPollerThread[option.getParallelism()];
for (int i = 0; i < pollers.length; i++) {
pollers[i] = new AsyncPollerThread(name + i, writerFactory, option);
}
return pollers;
}

@Override
public CompletableFuture<Void> put(TableName tableName, Put put) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(put, "put");

AsyncPollerThread writer = getExecutor(tableName, put);

List<CompletableFuture<Void>> futures = writer.write(tableName, List.of(put));
return futures.get(0);
}

private AsyncPollerThread getExecutor(TableName tableName, Put put) {
final int mod = mod(tableName, put);
return pollers[mod];
}

private int mod(TableName tableName, Put put) {
int hashcode = tableName.hashCode() + Arrays.hashCode(put.getRow());
return Math.abs(Math.floorMod(hashcode, pollers.length));
}

@Override
public List<CompletableFuture<Void>> put(TableName tableName, List<Put> puts) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(puts, "puts");
if (puts.isEmpty()) {
return List.of(CompletableFuture.completedFuture(null));
}

Put put = puts.get(0);
AsyncPollerThread writer = getExecutor(tableName, put);

return writer.write(tableName, puts);
}

@Override
public void close() throws IOException {
for (AsyncPollerThread writer : pollers) {
writer.close();
}
}

@Override
public String toString() {
return "AsyncPollingPutWriter{" +
"pollers=" + Arrays.toString(pollers) +
'}';
}
}
Loading

0 comments on commit 63599b5

Please sign in to comment.