diff --git a/collector/src/main/resources/hbase-root.properties b/collector/src/main/resources/hbase-root.properties index 7914f45617ec2..da892bf728526 100644 --- a/collector/src/main/resources/hbase-root.properties +++ b/collector/src/main/resources/hbase-root.properties @@ -27,11 +27,33 @@ hbase.client.properties.hbase.client.retries.number=4 hbase.client.put-writer.concurrency-limit=100000 hbase.client.span-put-writer.concurrency-limit=0 -# asyncBufferedMutator, asyncTable +# asyncBufferedMutator, asyncTable, asyncPoller hbase.client.put-writer=asyncBufferedMutator + hbase.client.put-writer.async-buffered-mutator.writeBufferSize=100 hbase.client.put-writer.async-buffered-mutator.writeBufferPeriodicFlush=100 +## asyncPoller +# parallelism=0 : auto detect cpu core +hbase.client.put-writer.async-poller.span.parallelism=0 +# 1: all core, n: 1/n core (only work if parallelism=0) +hbase.client.put-writer.async-poller.span.cpuRatio=1 +# Minimum CPU core (only work if parallelism=0) +hbase.client.put-writer.async-poller.span.minCpuCore=4 + +hbase.client.put-writer.async-poller.span.queueSize=10000 +hbase.client.put-writer.async-poller.span.writeBufferSize=100 +hbase.client.put-writer.async-poller.span.writeBufferPeriodicFlush=100 + +# parallelism=0 : auto detect cpu core +hbase.client.put-writer.async-poller.default.parallelism=0 +# n: 1/n core, 1: all core, 4: 1/4 core (only work if parallelism=0) +hbase.client.put-writer.async-poller.default.cpuRatio=4 +# Minimum CPU core (only work if parallelism=0) +hbase.client.put-writer.async-poller.default.minCpuCore=2 +hbase.client.put-writer.async-poller.default.queueSize=5000 +hbase.client.put-writer.async-poller.default.writeBufferSize=100 +hbase.client.put-writer.async-poller.default.writeBufferPeriodicFlush=100 # hbase async ================================================================= # enable hbase async operation. default: false diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOption.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOption.java new file mode 100644 index 0000000000000..262115cccbbc9 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOption.java @@ -0,0 +1,84 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import com.navercorp.pinpoint.common.util.CpuUtils; + +public class AsyncPollerOption { + + private int queueSize = 1000 * 100; + + private int writeBufferSize = 100; + private int writeBufferPeriodicFlush = 100; + private int parallelism = 0; + private int cpuRatio = 1; + private int minCpuCore = 2; + + + public int getQueueSize() { + return queueSize; + } + + public void setQueueSize(int queueSize) { + this.queueSize = queueSize; + } + + public int getWriteBufferSize() { + return writeBufferSize; + } + + public void setWriteBufferSize(int writeBufferSize) { + this.writeBufferSize = writeBufferSize; + } + + public int getWriteBufferPeriodicFlush() { + return writeBufferPeriodicFlush; + } + + public void setWriteBufferPeriodicFlush(int writeBufferPeriodicFlush) { + this.writeBufferPeriodicFlush = writeBufferPeriodicFlush; + } + + public int getParallelism() { + return cpu(parallelism, cpuRatio, minCpuCore); + } + + static int cpu(int parallelism, int cpuRatio, int minCpu) { + if (parallelism <= 0) { + final int cpuCount = CpuUtils.cpuCount(); + int cpu = Math.floorDiv(cpuCount, cpuRatio); + return Math.max(cpu, minCpu); + } + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public int getCpuRatio() { + return cpuRatio; + } + + public void setCpuRatio(int cpuRatio) { + this.cpuRatio = cpuRatio; + } + + public int getMinCpuCore() { + return minCpuCore; + } + + public void setMinCpuCore(int minCpuCore) { + this.minCpuCore = minCpuCore; + } + + @Override + public String toString() { + return "AsyncPollerOption{" + + "queueSize=" + queueSize + + ", writeBufferSize=" + writeBufferSize + + ", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush + + ", parallelism=" + parallelism + + ", cpuRatio=" + cpuRatio + + ", minCpuCore=" + minCpuCore + + '}'; + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java new file mode 100644 index 0000000000000..27484b16fe362 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java @@ -0,0 +1,176 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import com.navercorp.pinpoint.common.hbase.RequestNotPermittedException; +import com.navercorp.pinpoint.common.hbase.util.FutureUtils; +import com.navercorp.pinpoint.common.profiler.logging.ThrottledLogger; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class AsyncPollerThread implements Closeable { + + private final Logger logger = LogManager.getLogger(this.getClass()); + private final ThrottledLogger tLogger = ThrottledLogger.getLogger(logger, 100); + + private final TableWriterFactory writerFactory; + + private final BlockingQueue> queue; + private final int queueSize; + private final int writeBufferSize; + private final int writeBufferPeriodicFlush; + private final int pollTimeout; + + private final Thread thread; + + public static final RequestNotPermittedException OVERFLOW = new RequestNotPermittedException("write queue is full", false); + + public AsyncPollerThread(String id, TableWriterFactory writerFactory, + AsyncPollerOption option) { + this.writerFactory = Objects.requireNonNull(writerFactory, "writerFactory"); + + this.queueSize = option.getQueueSize(); + this.queue = new ArrayBlockingQueue<>(queueSize); + + this.writeBufferSize = option.getWriteBufferSize(); + this.writeBufferPeriodicFlush = option.getWriteBufferPeriodicFlush(); + this.pollTimeout = Math.max(writeBufferPeriodicFlush / 4, 20); + + this.thread = new Thread(this::dispatch, id); + this.thread.setDaemon(true); + this.thread.start(); + } + + public List> write(TableName tableName, List puts) { + Objects.requireNonNull(tableName, "tableName"); + Objects.requireNonNull(puts, "puts"); + + WriteRequest writeRequest = new WriteRequest<>(tableName, puts); + if (this.queue.offer(writeRequest)) { + return writeRequest.getFutures(); + } + tLogger.info("write queue overflow"); + return FutureUtils.newFutureList(() -> CompletableFuture.failedFuture(OVERFLOW), puts.size()); + } + + + public void dispatch() { + final Thread thread = this.thread; + while (!thread.isInterrupted()) { + try { + List> requests = poll(); + + Map>> map = tableGroup(requests); + + for (Map.Entry>> entry : map.entrySet()) { + TableName tableName = entry.getKey(); + List> writes = entry.getValue(); + + logger.info("write {} {} {}", thread.getName(), tableName, writes.size()); + + List puts = getPuts(writes); + + AsyncTableWriterFactory.Writer writer = this.writerFactory.newWriter(tableName); + List> hbaseResults = writer.batch(puts); + addListeners(hbaseResults, writes); + } + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + break; + } + } + } + + + private Map>> tableGroup(List> requests) { + Map>> map = new HashMap<>(); + for (WriteRequest req : requests) { + TableName tableName = req.getTableName(); + List> puts = map.computeIfAbsent(tableName, (key) -> new ArrayList<>()); + puts.add(req); + } + return map; + } + + @Override + public void close() { + this.thread.interrupt(); + try { + this.thread.join(3000); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + + + + private List getPuts(List> writes) { + List puts = new ArrayList<>(writes.size()); + for (WriteRequest write : writes) { + puts.addAll(write.getPuts()); + } + return puts; + } + + private static void addListeners(List> hbaseResults, List> requests) { + int i = 0; + for (WriteRequest writeRequest : requests) { + for (CompletableFuture write : writeRequest.getFutures()) { + CompletableFuture hbaseFuture = hbaseResults.get(i++); + FutureUtils.addListener(hbaseFuture, write); + } + } + } + + private List> poll() throws InterruptedException { + final long startTime = System.currentTimeMillis(); + + List> drain = new ArrayList<>(writeBufferSize); + int drainSize = 0; + while (true) { + WriteRequest request = queue.poll(pollTimeout, TimeUnit.MILLISECONDS); + if (request != null) { + drain.add(request); + drainSize += request.getPuts().size(); + if (bufferOverflow(drainSize)) { + return drain; + } + } + if (drainSize > 0) { + if (timeout(startTime)) { + return drain; + } + } + } + } + + private boolean timeout(long startTime) { + return System.currentTimeMillis() - startTime > writeBufferPeriodicFlush; + } + + private boolean bufferOverflow(int drainSize) { + return drainSize >= writeBufferSize; + } + + @Override + public String toString() { + return "AsyncPollerThread{" + + ", queueSize=" + queueSize + + ", writeBufferSize=" + writeBufferSize + + ", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush + + ", pollTimeout=" + pollTimeout + + ", thread=" + thread + + '}'; + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollingPutWriter.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollingPutWriter.java new file mode 100644 index 0000000000000..94a38b676f760 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollingPutWriter.java @@ -0,0 +1,83 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.Put; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +public class AsyncPollingPutWriter implements HbasePutWriter, Closeable { + + private final AsyncPollerThread[] pollers; + + + public AsyncPollingPutWriter(String name, AsyncConnection connection, AsyncPollerOption option) { + Objects.requireNonNull(option, "option"); + + TableWriterFactory factory = new AsyncTableWriterFactory(connection); + this.pollers = newAsyncWriteExecutors(name, factory, option); + } + + @SuppressWarnings("resource") + private AsyncPollerThread[] newAsyncWriteExecutors(String name, TableWriterFactory writerFactory, AsyncPollerOption option) { + final AsyncPollerThread[] pollers = new AsyncPollerThread[option.getParallelism()]; + for (int i = 0; i < pollers.length; i++) { + pollers[i] = new AsyncPollerThread(name + i, writerFactory, option); + } + return pollers; + } + + @Override + public CompletableFuture put(TableName tableName, Put put) { + Objects.requireNonNull(tableName, "tableName"); + Objects.requireNonNull(put, "put"); + + AsyncPollerThread writer = getExecutor(tableName, put); + + List> futures = writer.write(tableName, List.of(put)); + return futures.get(0); + } + + private AsyncPollerThread getExecutor(TableName tableName, Put put) { + final int mod = mod(tableName, put); + return pollers[mod]; + } + + private int mod(TableName tableName, Put put) { + int hashcode = tableName.hashCode() + Arrays.hashCode(put.getRow()); + return Math.abs(Math.floorMod(hashcode, pollers.length)); + } + + @Override + public List> put(TableName tableName, List puts) { + Objects.requireNonNull(tableName, "tableName"); + Objects.requireNonNull(puts, "puts"); + if (puts.isEmpty()) { + return List.of(CompletableFuture.completedFuture(null)); + } + + Put put = puts.get(0); + AsyncPollerThread writer = getExecutor(tableName, put); + + return writer.write(tableName, puts); + } + + @Override + public void close() throws IOException { + for (AsyncPollerThread writer : pollers) { + writer.close(); + } + } + + @Override + public String toString() { + return "AsyncPollingPutWriter{" + + "pollers=" + Arrays.toString(pollers) + + '}'; + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableWriterFactory.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableWriterFactory.java new file mode 100644 index 0000000000000..85a1be7283bbf --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableWriterFactory.java @@ -0,0 +1,21 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; + +import java.util.Objects; + +public class AsyncTableWriterFactory implements TableWriterFactory { + private final AsyncConnection connection; + + public AsyncTableWriterFactory(AsyncConnection connection) { + this.connection = Objects.requireNonNull(connection, "connection"); + } + + @Override + public Writer newWriter(TableName tableName) { + final AsyncTable table = connection.getTable(tableName); + return table::batch; + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/TableWriterFactory.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/TableWriterFactory.java new file mode 100644 index 0000000000000..a801ba556fef8 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/TableWriterFactory.java @@ -0,0 +1,16 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Row; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public interface TableWriterFactory { + Writer newWriter(TableName tableName); + + + interface Writer { + List> batch(List row); + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/WriteRequest.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/WriteRequest.java new file mode 100644 index 0000000000000..56cf8ed2150aa --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/WriteRequest.java @@ -0,0 +1,45 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import com.navercorp.pinpoint.common.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +public class WriteRequest { + + private final TableName tableName; + private final List puts; + private final List> futures; + + + public WriteRequest(TableName tableName, List puts) { + this.tableName = Objects.requireNonNull(tableName, "tableName"); + this.puts = Objects.requireNonNull(puts, "put"); + + this.futures = FutureUtils.newFutureList(CompletableFuture::new, puts.size()); + } + + private List> newFutureList(int size) { + final List> futures = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + futures.add(new CompletableFuture<>()); + } + return futures; + } + + public TableName getTableName() { + return tableName; + } + + public List getPuts() { + return puts; + } + + public List> getFutures() { + return futures; + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java index aa0cf16613487..886e917a57e6e 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java @@ -27,6 +27,8 @@ import com.navercorp.pinpoint.common.hbase.async.AsyncBufferedMutatorCustomizer; import com.navercorp.pinpoint.common.hbase.async.AsyncBufferedMutatorFactory; import com.navercorp.pinpoint.common.hbase.async.AsyncHbasePutWriter; +import com.navercorp.pinpoint.common.hbase.async.AsyncPollerOption; +import com.navercorp.pinpoint.common.hbase.async.AsyncPollingPutWriter; import com.navercorp.pinpoint.common.hbase.async.AsyncTableCustomizer; import com.navercorp.pinpoint.common.hbase.async.AsyncTableFactory; import com.navercorp.pinpoint.common.hbase.async.BatchAsyncHbasePutWriter; @@ -78,7 +80,9 @@ public AsyncTableCustomizer asyncTableCustomizer() { } @Bean - public AsyncTableFactory hbaseAsyncTableFactory(@Qualifier("hbaseAsyncConnection") AsyncConnection connection, AsyncTableCustomizer customizer) { + public AsyncTableFactory hbaseAsyncTableFactory(@Qualifier("hbaseAsyncConnection") + AsyncConnection connection, + AsyncTableCustomizer customizer) { return new HbaseAsyncTableFactory(connection, customizer); } @@ -89,7 +93,8 @@ public AsyncBufferedMutatorCustomizer asyncBufferedMutatorCustomizer() { } @Bean - public AsyncBufferedMutatorFactory hbaseAsyncBufferedMutatorFactory(@Qualifier("hbaseAsyncConnection") AsyncConnection connection, + public AsyncBufferedMutatorFactory hbaseAsyncBufferedMutatorFactory(@Qualifier("hbaseAsyncConnection") + AsyncConnection connection, AsyncBufferedMutatorCustomizer customizer) { logger.info("AsyncBufferedMutatorCustomizer {}", customizer); return new HbaseAsyncBufferedMutatorFactory(connection, customizer); @@ -228,6 +233,69 @@ private HbasePutWriter newPutWriter(AsyncBufferedMutatorFactory asyncTableFactor } } + @org.springframework.context.annotation.Configuration + @ConditionalOnProperty(name = "hbase.client.put-writer", havingValue = "asyncPoller") + public static class AsyncPollerPutWriterConfig { + private final Logger logger = LogManager.getLogger(AsyncPollerPutWriterConfig.class); + + public AsyncPollerPutWriterConfig() { + logger.info("Install {}", AsyncPollerPutWriterConfig.class.getSimpleName()); + } + + @ConfigurationProperties(prefix = "hbase.client.put-writer.async-poller.span") + @Bean + public AsyncPollerOption spanPollerOption() { + return new AsyncPollerOption(); + } + + @Primary + @Bean + public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncConnection") AsyncConnection connection, + @Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator, + @Qualifier("defaultPollerOption") + AsyncPollerOption option) { + + HbasePutWriter hbasePutWriter = newPollerWriter("asyncPoller-", connection, decorator, option); + logger.info("hbasePollerPutWriter {}", hbasePutWriter); + return hbasePutWriter; + } + + @ConfigurationProperties(prefix = "hbase.client.put-writer.async-poller.default") + @Bean + public AsyncPollerOption defaultPollerOption() { + return new AsyncPollerOption(); + } + + @Bean + public HbasePutWriterDecorator concurrencyDecorator(@Value("${hbase.client.put-writer.concurrency-limit:100000}") int concurrency) { + return new ConcurrencyDecorator(concurrency); + } + + @Bean + public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncConnection") AsyncConnection connection, + @Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator, + @Qualifier("defaultPollerOption") + AsyncPollerOption option) { + + HbasePutWriter hbasePutWriter = newPollerWriter("spanAsyncPoller-", connection, decorator, option); + logger.info("HbaseSpanPollerPutWriter {}", hbasePutWriter); + return hbasePutWriter; + } + + @Bean + public HbasePutWriterDecorator spanConcurrencyDecorator(@Value("${hbase.client.span-put-writer.concurrency-limit:1000000}") int concurrency) { + return new ConcurrencyDecorator(concurrency); + } + + private HbasePutWriter newPollerWriter(String name, AsyncConnection connection, + HbasePutWriterDecorator decorator, + AsyncPollerOption option) { + HbasePutWriter writer = new AsyncPollingPutWriter(name, connection, option); + HbasePutWriter putWriter = decorator.decorator(writer); + return new LoggingHbasePutWriter(putWriter); + } + } + @Bean public AdminFactory hbaseAdminFactory(@Qualifier("hbaseConnection") Connection connection) { return new HbaseAdminFactory(connection); diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/FutureUtils.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/FutureUtils.java new file mode 100644 index 0000000000000..4697c4386752f --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/util/FutureUtils.java @@ -0,0 +1,43 @@ +package com.navercorp.pinpoint.common.hbase.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Supplier; + +public final class FutureUtils { + private FutureUtils() { + } + + public static List> newFutureList(Supplier> supplier, int size) { + List> list = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + list.add(supplier.get()); + } + return list; + } + + public static void addListener(final CompletableFuture future, final CompletableFuture action) { + future.whenComplete((v, t) -> { + if (t != null) { + t = unwrapCompletionException(t); + action.completeExceptionally(t); + } else { + action.complete(v); + } + }); + } + + public static Throwable unwrapCompletionException(Throwable error) { + if (error instanceof CompletionException) { + Throwable cause = error.getCause(); + if (cause != null) { + return cause; + } + } + + return error; + } + +} diff --git a/commons-hbase/src/test/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOptionTest.java b/commons-hbase/src/test/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOptionTest.java new file mode 100644 index 0000000000000..d7f0ac43701d1 --- /dev/null +++ b/commons-hbase/src/test/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOptionTest.java @@ -0,0 +1,35 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import com.navercorp.pinpoint.common.util.CpuUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class AsyncPollerOptionTest { + + @Test + void getParallelism() { + AsyncPollerOption option = new AsyncPollerOption(); + Assertions.assertEquals(CpuUtils.cpuCount(), option.getParallelism()); + } + + + @Test + void getParallelism_min() { + AsyncPollerOption option = new AsyncPollerOption(); + option.setMinCpuCore(2); + option.setCpuRatio(100); + + Assertions.assertEquals(2, option.getParallelism()); + } + + @Test + void getParallelism_ratio() { + int cpu = CpuUtils.cpuCount(); + + AsyncPollerOption option = new AsyncPollerOption(); + option.setMinCpuCore(4); + option.setCpuRatio(2); + + Assertions.assertEquals(cpu/2, option.getParallelism()); + } +} \ No newline at end of file diff --git a/commons-hbase/src/test/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThreadTest.java b/commons-hbase/src/test/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThreadTest.java new file mode 100644 index 0000000000000..c913a3294daf7 --- /dev/null +++ b/commons-hbase/src/test/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThreadTest.java @@ -0,0 +1,66 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import com.navercorp.pinpoint.common.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.RepeatedTest; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +class AsyncPollerThreadTest { + + TableName table = TableName.valueOf("table"); + Random random = new Random(); + +// @Test + @RepeatedTest(5) + void write() throws ExecutionException, InterruptedException, TimeoutException { + TableWriterFactory factory = tableName -> this::batch; + + AsyncPollerOption option = new AsyncPollerOption(); + option.setQueueSize(100); + option.setWriteBufferSize(100); + option.setWriteBufferPeriodicFlush(200); + + AsyncPollerThread poller = new AsyncPollerThread("test", factory, option); + + Put put1 = new Put(nextBytes(8)); + Put put2 = new Put(nextBytes(9)); + Put put3 = new Put(nextBytes(10)); + + List> future1 = poller.write(this.table, List.of(put1)); + List> future2 = poller.write(this.table, List.of(put2, put3)); + + + Assertions.assertNull(awaitAndGet(future1, 0)); + + Assertions.assertNull(awaitAndGet(future2, 0)); + Assertions.assertNull(awaitAndGet(future2, 1)); + + + poller.close(); + } + + private Void awaitAndGet(List> listFuture, int index) throws InterruptedException, ExecutionException, TimeoutException { + CompletableFuture future = listFuture.get(index); + return future.get(1000, TimeUnit.MILLISECONDS); + } + + private byte[] nextBytes(int size) { + byte[] bytes = new byte[size]; + random.nextBytes(bytes); + return bytes; + } + + private List> batch(List list) { + return FutureUtils.newFutureList(() -> CompletableFuture.completedFuture(null), list.size()); + } +} \ No newline at end of file