From 2cfc2965dbb8d1ecb0fa9e15d829362b2f643078 Mon Sep 17 00:00:00 2001 From: emeroad Date: Wed, 14 Aug 2024 14:11:36 +0900 Subject: [PATCH] [#11350] Add write option to AsyncBufferedMutatorBuilder --- .../src/main/resources/hbase-root.properties | 5 ++ .../async/AsyncBufferedMutatorCustomizer.java | 7 +++ .../async/AsyncBufferedMutatorFactory.java | 13 +++++ .../hbase/async/AsyncTableCustomizer.java | 3 -- .../common/hbase/async/AsyncTableFactory.java | 5 -- .../hbase/async/BatchAsyncHbasePutWriter.java | 4 +- ...DefaultAsyncBufferedMutatorCustomizer.java | 53 +++++++++++++++++++ .../async/DefaultAsyncTableCustomizer.java | 11 ++-- .../HbaseAsyncBufferedMutatorFactory.java | 38 +++++++++++++ .../hbase/async/HbaseAsyncTableFactory.java | 19 ------- .../config/HbaseTemplateConfiguration.java | 31 ++++++++--- 11 files changed, 145 insertions(+), 44 deletions(-) create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncBufferedMutatorCustomizer.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncBufferedMutatorFactory.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java create mode 100644 commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncBufferedMutatorFactory.java diff --git a/collector/src/main/resources/hbase-root.properties b/collector/src/main/resources/hbase-root.properties index 2fddeff4b0e2..7914f45617ec 100644 --- a/collector/src/main/resources/hbase-root.properties +++ b/collector/src/main/resources/hbase-root.properties @@ -27,6 +27,11 @@ hbase.client.properties.hbase.client.retries.number=4 hbase.client.put-writer.concurrency-limit=100000 hbase.client.span-put-writer.concurrency-limit=0 +# asyncBufferedMutator, asyncTable +hbase.client.put-writer=asyncBufferedMutator +hbase.client.put-writer.async-buffered-mutator.writeBufferSize=100 +hbase.client.put-writer.async-buffered-mutator.writeBufferPeriodicFlush=100 + # hbase async ================================================================= # enable hbase async operation. default: false diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncBufferedMutatorCustomizer.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncBufferedMutatorCustomizer.java new file mode 100644 index 000000000000..963f5dec1333 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncBufferedMutatorCustomizer.java @@ -0,0 +1,7 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder; + +public interface AsyncBufferedMutatorCustomizer { + void customize(AsyncBufferedMutatorBuilder builder); +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncBufferedMutatorFactory.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncBufferedMutatorFactory.java new file mode 100644 index 000000000000..be42dda73a5e --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncBufferedMutatorFactory.java @@ -0,0 +1,13 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncBufferedMutator; + +import java.util.concurrent.ExecutorService; + +public interface AsyncBufferedMutatorFactory { + + AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool); + + AsyncBufferedMutator getBufferedMutator(TableName tableName); +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableCustomizer.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableCustomizer.java index 1d26ed90b8d6..ef3c93853eba 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableCustomizer.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableCustomizer.java @@ -17,11 +17,8 @@ package com.navercorp.pinpoint.common.hbase.async; -import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder; import org.apache.hadoop.hbase.client.AsyncTableBuilder; public interface AsyncTableCustomizer { void customize(AsyncTableBuilder builder); - - void customize(AsyncBufferedMutatorBuilder builder); } diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableFactory.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableFactory.java index 4286e1b12809..0f4ebd2adf0e 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableFactory.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncTableFactory.java @@ -19,7 +19,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; -import org.apache.hadoop.hbase.client.AsyncBufferedMutator; import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.ScanResultConsumer; @@ -38,8 +37,4 @@ public interface AsyncTableFactory { */ AsyncTable getTable(TableName tableName, ExecutorService pool); - - AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool); - - AsyncBufferedMutator getBufferedMutator(TableName tableName); } diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/BatchAsyncHbasePutWriter.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/BatchAsyncHbasePutWriter.java index 62dcc0e37da5..1bb9c9e369d6 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/BatchAsyncHbasePutWriter.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/BatchAsyncHbasePutWriter.java @@ -26,9 +26,9 @@ import java.util.concurrent.CompletableFuture; public class BatchAsyncHbasePutWriter implements HbasePutWriter { - private final AsyncTableFactory asyncTableFactory; + private final AsyncBufferedMutatorFactory asyncTableFactory; - public BatchAsyncHbasePutWriter(AsyncTableFactory asyncTableFactory) { + public BatchAsyncHbasePutWriter(AsyncBufferedMutatorFactory asyncTableFactory) { this.asyncTableFactory = Objects.requireNonNull(asyncTableFactory, "asyncTableFactory"); } diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java new file mode 100644 index 000000000000..186d362f580d --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.navercorp.pinpoint.common.hbase.async; + +import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder; + +import java.util.concurrent.TimeUnit; + +public class DefaultAsyncBufferedMutatorCustomizer implements AsyncBufferedMutatorCustomizer { + + private long writeBufferSize = 100; + private long writeBufferPeriodicFlush = 100; + + public DefaultAsyncBufferedMutatorCustomizer() { + } + + public void setWriteBufferSize(long writeBufferSize) { + this.writeBufferSize = writeBufferSize; + } + + public void setWriteBufferPeriodicFlush(long writeBufferPeriodicFlush) { + this.writeBufferPeriodicFlush = writeBufferPeriodicFlush; + } + + @Override + public void customize(AsyncBufferedMutatorBuilder builder) { + builder.setWriteBufferSize(writeBufferSize); + builder.setWriteBufferPeriodicFlush(writeBufferPeriodicFlush, TimeUnit.MILLISECONDS); + } + + @Override + public String toString() { + return "DefaultAsyncBufferedMutatorCustomizer{" + + "writeBufferSize=" + writeBufferSize + + ", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush + + '}'; + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncTableCustomizer.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncTableCustomizer.java index a143d8b5214a..15c56ab480fb 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncTableCustomizer.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncTableCustomizer.java @@ -17,21 +17,16 @@ package com.navercorp.pinpoint.common.hbase.async; -import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder; import org.apache.hadoop.hbase.client.AsyncTableBuilder; -import java.util.concurrent.TimeUnit; - public class DefaultAsyncTableCustomizer implements AsyncTableCustomizer { - @Override - public void customize(AsyncTableBuilder builder) { - + public DefaultAsyncTableCustomizer() { } @Override - public void customize(AsyncBufferedMutatorBuilder builder) { + public void customize(AsyncTableBuilder builder) { - builder.setWriteBufferPeriodicFlush(500, TimeUnit.MILLISECONDS); } + } diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncBufferedMutatorFactory.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncBufferedMutatorFactory.java new file mode 100644 index 000000000000..aae589cb4b21 --- /dev/null +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncBufferedMutatorFactory.java @@ -0,0 +1,38 @@ +package com.navercorp.pinpoint.common.hbase.async; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncBufferedMutator; +import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.springframework.cache.annotation.Cacheable; + +import java.util.Objects; +import java.util.concurrent.ExecutorService; + +public class HbaseAsyncBufferedMutatorFactory implements AsyncBufferedMutatorFactory { + + private final AsyncConnection connection; + private final AsyncBufferedMutatorCustomizer customizer; + + public HbaseAsyncBufferedMutatorFactory(AsyncConnection connection, AsyncBufferedMutatorCustomizer customizer) { + this.connection = Objects.requireNonNull(connection, "connection"); + this.customizer = Objects.requireNonNull(customizer, "customizer"); + } + + + @Override + @Cacheable(cacheNames = "bufferedMutator-pool", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager") + public AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool) { + AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName, pool); + customizer.customize(builder); + return builder.build(); + } + + @Override + @Cacheable(cacheNames = "bufferedMutator", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager") + public AsyncBufferedMutator getBufferedMutator(TableName tableName) { + AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName); + customizer.customize(builder); + return builder.build(); + } +} diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncTableFactory.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncTableFactory.java index 167c1b1d4a15..58ddd38f2937 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncTableFactory.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncTableFactory.java @@ -19,8 +19,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; -import org.apache.hadoop.hbase.client.AsyncBufferedMutator; -import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder; import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.AsyncTableBuilder; @@ -56,21 +54,4 @@ public AsyncTable getTable(TableName tableName, ExecutorServ return builder.build(); } - - @Override - @Cacheable(cacheNames = "bufferedMutator-pool", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager") - public AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool) { - AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName, pool); - customizer.customize(builder); - return builder.build(); - } - - @Override - @Cacheable(cacheNames = "bufferedMutator", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager") - public AsyncBufferedMutator getBufferedMutator(TableName tableName) { - AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName); - customizer.customize(builder); - return builder.build(); - } - } diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java index 998508c0b732..aa0cf1661348 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java @@ -24,12 +24,16 @@ import com.navercorp.pinpoint.common.hbase.HbaseTemplate; import com.navercorp.pinpoint.common.hbase.HbaseVersionCheckBean; import com.navercorp.pinpoint.common.hbase.TableFactory; +import com.navercorp.pinpoint.common.hbase.async.AsyncBufferedMutatorCustomizer; +import com.navercorp.pinpoint.common.hbase.async.AsyncBufferedMutatorFactory; import com.navercorp.pinpoint.common.hbase.async.AsyncHbasePutWriter; import com.navercorp.pinpoint.common.hbase.async.AsyncTableCustomizer; import com.navercorp.pinpoint.common.hbase.async.AsyncTableFactory; import com.navercorp.pinpoint.common.hbase.async.BatchAsyncHbasePutWriter; import com.navercorp.pinpoint.common.hbase.async.ConcurrencyDecorator; +import com.navercorp.pinpoint.common.hbase.async.DefaultAsyncBufferedMutatorCustomizer; import com.navercorp.pinpoint.common.hbase.async.DefaultAsyncTableCustomizer; +import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncBufferedMutatorFactory; import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncCacheConfiguration; import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTableFactory; import com.navercorp.pinpoint.common.hbase.async.HbasePutWriter; @@ -78,6 +82,19 @@ public AsyncTableFactory hbaseAsyncTableFactory(@Qualifier("hbaseAsyncConnection return new HbaseAsyncTableFactory(connection, customizer); } + @Bean + @ConfigurationProperties(prefix = "hbase.client.put-writer.async-buffered-mutator") + public AsyncBufferedMutatorCustomizer asyncBufferedMutatorCustomizer() { + return new DefaultAsyncBufferedMutatorCustomizer(); + } + + @Bean + public AsyncBufferedMutatorFactory hbaseAsyncBufferedMutatorFactory(@Qualifier("hbaseAsyncConnection") AsyncConnection connection, + AsyncBufferedMutatorCustomizer customizer) { + logger.info("AsyncBufferedMutatorCustomizer {}", customizer); + return new HbaseAsyncBufferedMutatorFactory(connection, customizer); + } + @Bean @ConditionalOnProperty(name = "hbase.client.parallel.scan.enable", havingValue = "true") @@ -136,7 +153,7 @@ public AsyncHbasePutWriterConfig() { @Primary @Bean public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory, - @Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) { + @Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) { HbasePutWriter putWriter = newPutWriter(asyncTableFactory, decorator); logger.info("hbasePutWriter {}", putWriter); return putWriter; @@ -149,7 +166,7 @@ public HbasePutWriterDecorator concurrencyDecorator(@Value("${hbase.client.put-w @Bean public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory, - @Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) { + @Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) { HbasePutWriter putWriter = newPutWriter(asyncTableFactory, decorator); logger.info("hbaseSpanPutWriter {}", putWriter); return putWriter; @@ -179,8 +196,8 @@ public AsyncBufferedHbasePutWriterConfig() { @Primary @Bean - public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory, - @Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) { + public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncBufferedMutatorFactory") AsyncBufferedMutatorFactory asyncTableFactory, + @Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) { HbasePutWriter hbasePutWriter = newPutWriter(asyncTableFactory, decorator); logger.info("hbasePutWriter {}", hbasePutWriter); return hbasePutWriter; @@ -192,8 +209,8 @@ public HbasePutWriterDecorator concurrencyDecorator(@Value("${hbase.client.put-w } @Bean - public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory, - @Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) { + public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncBufferedMutatorFactory") AsyncBufferedMutatorFactory asyncTableFactory, + @Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) { HbasePutWriter hbasePutWriter = newPutWriter(asyncTableFactory, decorator); logger.info("HbaseSpanPutWriter {}", hbasePutWriter); return hbasePutWriter; @@ -204,7 +221,7 @@ public HbasePutWriterDecorator spanConcurrencyDecorator(@Value("${hbase.client.s return new ConcurrencyDecorator(concurrency); } - private HbasePutWriter newPutWriter(AsyncTableFactory asyncTableFactory, HbasePutWriterDecorator decorator) { + private HbasePutWriter newPutWriter(AsyncBufferedMutatorFactory asyncTableFactory, HbasePutWriterDecorator decorator) { HbasePutWriter writer = new BatchAsyncHbasePutWriter(asyncTableFactory); HbasePutWriter putWriter = decorator.decorator(writer); return new LoggingHbasePutWriter(putWriter);