Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#11350] Add write option to AsyncBufferedMutatorBuilder #11351

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ hbase.client.properties.hbase.client.retries.number=4
hbase.client.put-writer.concurrency-limit=100000
hbase.client.span-put-writer.concurrency-limit=0

# asyncBufferedMutator, asyncTable
hbase.client.put-writer=asyncBufferedMutator
hbase.client.put-writer.async-buffered-mutator.writeBufferSize=100
hbase.client.put-writer.async-buffered-mutator.writeBufferPeriodicFlush=100


# hbase async =================================================================
# enable hbase async operation. default: false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;

public interface AsyncBufferedMutatorCustomizer {
void customize(AsyncBufferedMutatorBuilder builder);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncBufferedMutator;

import java.util.concurrent.ExecutorService;

public interface AsyncBufferedMutatorFactory {

AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool);

AsyncBufferedMutator getBufferedMutator(TableName tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
import org.apache.hadoop.hbase.client.AsyncTableBuilder;

public interface AsyncTableCustomizer {
void customize(AsyncTableBuilder<?> builder);

void customize(AsyncBufferedMutatorBuilder builder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncBufferedMutator;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ScanResultConsumer;

Expand All @@ -38,8 +37,4 @@ public interface AsyncTableFactory {
*/
AsyncTable<ScanResultConsumer> getTable(TableName tableName, ExecutorService pool);


AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool);

AsyncBufferedMutator getBufferedMutator(TableName tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.util.concurrent.CompletableFuture;

public class BatchAsyncHbasePutWriter implements HbasePutWriter {
private final AsyncTableFactory asyncTableFactory;
private final AsyncBufferedMutatorFactory asyncTableFactory;

public BatchAsyncHbasePutWriter(AsyncTableFactory asyncTableFactory) {
public BatchAsyncHbasePutWriter(AsyncBufferedMutatorFactory asyncTableFactory) {

Check warning on line 31 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/BatchAsyncHbasePutWriter.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/BatchAsyncHbasePutWriter.java#L31

Added line #L31 was not covered by tests
this.asyncTableFactory = Objects.requireNonNull(asyncTableFactory, "asyncTableFactory");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2023 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;

import java.util.concurrent.TimeUnit;

public class DefaultAsyncBufferedMutatorCustomizer implements AsyncBufferedMutatorCustomizer {

private long writeBufferSize = 100;
private long writeBufferPeriodicFlush = 100;

Check warning on line 27 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java#L26-L27

Added lines #L26 - L27 were not covered by tests

public DefaultAsyncBufferedMutatorCustomizer() {
}

Check warning on line 30 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java#L29-L30

Added lines #L29 - L30 were not covered by tests

public void setWriteBufferSize(long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
}

Check warning on line 34 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java#L33-L34

Added lines #L33 - L34 were not covered by tests

public void setWriteBufferPeriodicFlush(long writeBufferPeriodicFlush) {
this.writeBufferPeriodicFlush = writeBufferPeriodicFlush;
}

Check warning on line 38 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java#L37-L38

Added lines #L37 - L38 were not covered by tests

@Override
public void customize(AsyncBufferedMutatorBuilder builder) {
builder.setWriteBufferSize(writeBufferSize);
builder.setWriteBufferPeriodicFlush(writeBufferPeriodicFlush, TimeUnit.MILLISECONDS);
}

Check warning on line 44 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java#L42-L44

Added lines #L42 - L44 were not covered by tests

@Override
public String toString() {
return "DefaultAsyncBufferedMutatorCustomizer{" +

Check warning on line 48 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncBufferedMutatorCustomizer.java#L48

Added line #L48 was not covered by tests
"writeBufferSize=" + writeBufferSize +
", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@

package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
import org.apache.hadoop.hbase.client.AsyncTableBuilder;

import java.util.concurrent.TimeUnit;

public class DefaultAsyncTableCustomizer implements AsyncTableCustomizer {

@Override
public void customize(AsyncTableBuilder<?> builder) {

public DefaultAsyncTableCustomizer() {

Check warning on line 24 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncTableCustomizer.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/DefaultAsyncTableCustomizer.java#L24

Added line #L24 was not covered by tests
}

@Override
public void customize(AsyncBufferedMutatorBuilder builder) {
public void customize(AsyncTableBuilder<?> builder) {

builder.setWriteBufferPeriodicFlush(500, TimeUnit.MILLISECONDS);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncBufferedMutator;
import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.springframework.cache.annotation.Cacheable;

import java.util.Objects;
import java.util.concurrent.ExecutorService;

public class HbaseAsyncBufferedMutatorFactory implements AsyncBufferedMutatorFactory {

private final AsyncConnection connection;
private final AsyncBufferedMutatorCustomizer customizer;

public HbaseAsyncBufferedMutatorFactory(AsyncConnection connection, AsyncBufferedMutatorCustomizer customizer) {
this.connection = Objects.requireNonNull(connection, "connection");
this.customizer = Objects.requireNonNull(customizer, "customizer");
}

Check warning on line 20 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncBufferedMutatorFactory.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncBufferedMutatorFactory.java#L17-L20

Added lines #L17 - L20 were not covered by tests


@Override
@Cacheable(cacheNames = "bufferedMutator-pool", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager")
public AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool) {
AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName, pool);
customizer.customize(builder);
return builder.build();

Check warning on line 28 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncBufferedMutatorFactory.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncBufferedMutatorFactory.java#L26-L28

Added lines #L26 - L28 were not covered by tests
}

@Override
@Cacheable(cacheNames = "bufferedMutator", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager")
public AsyncBufferedMutator getBufferedMutator(TableName tableName) {
AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName);
customizer.customize(builder);
return builder.build();

Check warning on line 36 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncBufferedMutatorFactory.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/HbaseAsyncBufferedMutatorFactory.java#L34-L36

Added lines #L34 - L36 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncBufferedMutator;
import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.AsyncTableBuilder;
Expand Down Expand Up @@ -56,21 +54,4 @@ public AsyncTable<ScanResultConsumer> getTable(TableName tableName, ExecutorServ
return builder.build();
}


@Override
@Cacheable(cacheNames = "bufferedMutator-pool", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager")
public AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool) {
AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName, pool);
customizer.customize(builder);
return builder.build();
}

@Override
@Cacheable(cacheNames = "bufferedMutator", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager")
public AsyncBufferedMutator getBufferedMutator(TableName tableName) {
AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName);
customizer.customize(builder);
return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import com.navercorp.pinpoint.common.hbase.HbaseTemplate;
import com.navercorp.pinpoint.common.hbase.HbaseVersionCheckBean;
import com.navercorp.pinpoint.common.hbase.TableFactory;
import com.navercorp.pinpoint.common.hbase.async.AsyncBufferedMutatorCustomizer;
import com.navercorp.pinpoint.common.hbase.async.AsyncBufferedMutatorFactory;
import com.navercorp.pinpoint.common.hbase.async.AsyncHbasePutWriter;
import com.navercorp.pinpoint.common.hbase.async.AsyncTableCustomizer;
import com.navercorp.pinpoint.common.hbase.async.AsyncTableFactory;
import com.navercorp.pinpoint.common.hbase.async.BatchAsyncHbasePutWriter;
import com.navercorp.pinpoint.common.hbase.async.ConcurrencyDecorator;
import com.navercorp.pinpoint.common.hbase.async.DefaultAsyncBufferedMutatorCustomizer;
import com.navercorp.pinpoint.common.hbase.async.DefaultAsyncTableCustomizer;
import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncBufferedMutatorFactory;
import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncCacheConfiguration;
import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTableFactory;
import com.navercorp.pinpoint.common.hbase.async.HbasePutWriter;
Expand Down Expand Up @@ -78,6 +82,19 @@
return new HbaseAsyncTableFactory(connection, customizer);
}

@Bean
@ConfigurationProperties(prefix = "hbase.client.put-writer.async-buffered-mutator")
public AsyncBufferedMutatorCustomizer asyncBufferedMutatorCustomizer() {
return new DefaultAsyncBufferedMutatorCustomizer();

Check warning on line 88 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L88

Added line #L88 was not covered by tests
}

@Bean
public AsyncBufferedMutatorFactory hbaseAsyncBufferedMutatorFactory(@Qualifier("hbaseAsyncConnection") AsyncConnection connection,
AsyncBufferedMutatorCustomizer customizer) {
logger.info("AsyncBufferedMutatorCustomizer {}", customizer);
return new HbaseAsyncBufferedMutatorFactory(connection, customizer);

Check warning on line 95 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L94-L95

Added lines #L94 - L95 were not covered by tests
}


@Bean
@ConditionalOnProperty(name = "hbase.client.parallel.scan.enable", havingValue = "true")
Expand Down Expand Up @@ -136,7 +153,7 @@
@Primary
@Bean
public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter putWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("hbasePutWriter {}", putWriter);
return putWriter;
Expand All @@ -149,7 +166,7 @@

@Bean
public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) {
@Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter putWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("hbaseSpanPutWriter {}", putWriter);
return putWriter;
Expand Down Expand Up @@ -179,8 +196,8 @@

@Primary
@Bean
public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncBufferedMutatorFactory") AsyncBufferedMutatorFactory asyncTableFactory,
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter hbasePutWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("hbasePutWriter {}", hbasePutWriter);
return hbasePutWriter;
Expand All @@ -192,8 +209,8 @@
}

@Bean
public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) {
public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncBufferedMutatorFactory") AsyncBufferedMutatorFactory asyncTableFactory,
@Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter hbasePutWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("HbaseSpanPutWriter {}", hbasePutWriter);
return hbasePutWriter;
Expand All @@ -204,7 +221,7 @@
return new ConcurrencyDecorator(concurrency);
}

private HbasePutWriter newPutWriter(AsyncTableFactory asyncTableFactory, HbasePutWriterDecorator decorator) {
private HbasePutWriter newPutWriter(AsyncBufferedMutatorFactory asyncTableFactory, HbasePutWriterDecorator decorator) {
HbasePutWriter writer = new BatchAsyncHbasePutWriter(asyncTableFactory);
HbasePutWriter putWriter = decorator.decorator(writer);
return new LoggingHbasePutWriter(putWriter);
Expand Down