Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#10976] Add additional spanPutWriter #10998

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,19 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;

import java.util.List;

@Configuration
@Import(CommonCacheManagerConfiguration.class)
@Import({
CommonCacheManagerConfiguration.class,
HbaseAsyncConfiguration.TableMultiplexerConfig.class
})
public class HbaseAsyncConfiguration {

private final Logger logger = LogManager.getLogger(HbaseAsyncConfiguration.class);

@Bean
public HbaseMultiplexerProperties hbaseMultiplexerProperties() {
return new HbaseMultiplexerProperties();
}

@Bean
public HbaseBatchPerformanceCounter batchPerformanceCounter() {
return new HbaseBatchPerformanceCounter();
Expand All @@ -60,16 +59,43 @@
return new HBaseAsyncOperationMetrics(hBaseAsyncOperationList);
}

@Bean
@Configuration
@ConditionalOnProperty(name = "hbase.client.put-writer", havingValue = "tableMultiplexer")
public HbasePutWriter tableMultiplexerBatchWriter(@Qualifier("batchConnectionFactory") Connection connection,
HbaseBatchPerformanceCounter counter,
HbaseMultiplexerProperties properties) throws Exception {
HBaseTableMultiplexerFactory factory = new HBaseTableMultiplexerFactory(connection, counter);
factory.setHbaseMultiplexerProperties(properties);
HbasePutWriter writer = factory.getObject();
logger.info("HbasePutWriter:{}", writer);
return new LoggingHbasePutWriter(writer);
public static class TableMultiplexerConfig {
private final Logger logger = LogManager.getLogger(TableMultiplexerConfig.class);

Check warning on line 65 in collector/src/main/java/com/navercorp/pinpoint/collector/config/HbaseAsyncConfiguration.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/config/HbaseAsyncConfiguration.java#L64-L65

Added lines #L64 - L65 were not covered by tests

@Bean
public HbaseMultiplexerProperties hbaseMultiplexerProperties() {
return new HbaseMultiplexerProperties();

Check warning on line 69 in collector/src/main/java/com/navercorp/pinpoint/collector/config/HbaseAsyncConfiguration.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/config/HbaseAsyncConfiguration.java#L69

Added line #L69 was not covered by tests
}

@Primary
@Bean
public HbasePutWriter hbasePutWriter(@Qualifier("batchConnectionFactory") Connection connection,
HbaseBatchPerformanceCounter counter,
HbaseMultiplexerProperties properties) throws Exception {
HbasePutWriter writer = newPutWriter(connection, counter, properties);
logger.info("HbaseSpanPutWriter:{}", writer);
return writer;

Check warning on line 79 in collector/src/main/java/com/navercorp/pinpoint/collector/config/HbaseAsyncConfiguration.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/config/HbaseAsyncConfiguration.java#L77-L79

Added lines #L77 - L79 were not covered by tests
}

@Bean
public HbasePutWriter spanPutWriter(@Qualifier("batchConnectionFactory") Connection connection,
HbaseBatchPerformanceCounter counter,
HbaseMultiplexerProperties properties) throws Exception {
HbasePutWriter writer = newPutWriter(connection, counter, properties);
logger.info("HbaseSpanPutWriter:{}", writer);
return writer;

Check warning on line 88 in collector/src/main/java/com/navercorp/pinpoint/collector/config/HbaseAsyncConfiguration.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/config/HbaseAsyncConfiguration.java#L86-L88

Added lines #L86 - L88 were not covered by tests
}

private HbasePutWriter newPutWriter(Connection connection, HbaseBatchPerformanceCounter counter,
HbaseMultiplexerProperties properties) throws Exception {
HBaseTableMultiplexerFactory factory = new HBaseTableMultiplexerFactory(connection, counter);
factory.setHbaseMultiplexerProperties(properties);
HbasePutWriter writer = factory.getObject();
return new LoggingHbasePutWriter(writer);

Check warning on line 96 in collector/src/main/java/com/navercorp/pinpoint/collector/config/HbaseAsyncConfiguration.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/config/HbaseAsyncConfiguration.java#L93-L96

Added lines #L93 - L96 were not covered by tests
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public class HbaseTraceDaoV2 implements TraceDao {
private final RowKeyEncoder<TransactionId> rowKeyEncoder;
private final HbasePutWriter putWriter;

public HbaseTraceDaoV2(HbasePutWriter putWriter,
public HbaseTraceDaoV2(@Qualifier("spanPutWriter")
HbasePutWriter putWriter,
TableNameProvider tableNameProvider,
@Qualifier("traceRowKeyEncoderV2") RowKeyEncoder<TransactionId> rowKeyEncoder,
SpanSerializerV2 spanSerializer,
Expand Down
3 changes: 2 additions & 1 deletion collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ hbase.client.properties.hbase.client.max.retries.in.queue=10000
hbase.client.properties.hbase.client.async.max.retries.in.queue=10000
hbase.client.properties.hbase.client.retries.number=4

# Limit concurrent requests to prevent OOM. default: 100000
# Limit concurrent requests to prevent OOM. Unlimited:0
hbase.client.put-writer.concurrency-limit=100000
hbase.client.span-put-writer.concurrency-limit=0

# hbase async =================================================================
# enable hbase async operation. default: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,10 @@
}


@Override
public String toString() {
return "LoggingHbasePutWriter{" +

Check warning on line 60 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/LoggingHbasePutWriter.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/LoggingHbasePutWriter.java#L60

Added line #L60 was not covered by tests
"delegate=" + delegate +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@
import java.util.Optional;

@org.springframework.context.annotation.Configuration
@Import(HbaseAsyncCacheConfiguration.class)
@Import({
HbaseAsyncCacheConfiguration.class,

HbaseTemplateConfiguration.AsyncBufferedHbasePutWriterConfig.class,
HbaseTemplateConfiguration.AsyncHbasePutWriterConfig.class,
})
public class HbaseTemplateConfiguration {
private final Logger logger = LogManager.getLogger(HbaseTemplateConfiguration.class);

Expand Down Expand Up @@ -118,30 +123,91 @@
return template2;
}

@Bean
@org.springframework.context.annotation.Configuration
@ConditionalOnProperty(name = "hbase.client.put-writer", havingValue = "asyncTable")
public HbasePutWriter asyncHbasePutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
AsyncHbasePutWriter writer = new AsyncHbasePutWriter(asyncTableFactory);
HbasePutWriter putWriter = decorator.decorator(writer);
logger.info("HbasePutWriter {}", putWriter);
return new LoggingHbasePutWriter(putWriter);
}
public static class AsyncHbasePutWriterConfig {
private final Logger logger = LogManager.getLogger(AsyncHbasePutWriterConfig.class);

Check warning on line 129 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L129

Added line #L129 was not covered by tests

@Bean
public HbasePutWriterDecorator concurrencyDecorator(@Value("${hbase.client.put-writer.concurrency-limit:1000000}") int concurrency) {
return new ConcurrencyDecorator(concurrency);
public AsyncHbasePutWriterConfig() {
logger.info("Install {}", AsyncHbasePutWriterConfig.class.getSimpleName());
}

Check warning on line 133 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L131-L133

Added lines #L131 - L133 were not covered by tests

@Primary
@Bean
public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter putWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("hbasePutWriter {}", putWriter);
return putWriter;

Check warning on line 141 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L139-L141

Added lines #L139 - L141 were not covered by tests
}

@Bean
public HbasePutWriterDecorator concurrencyDecorator(@Value("${hbase.client.put-writer.concurrency-limit:100000}") int concurrency) {
return new ConcurrencyDecorator(concurrency);

Check warning on line 146 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L146

Added line #L146 was not covered by tests
}

@Bean
public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter putWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("hbaseSpanPutWriter {}", putWriter);
return putWriter;

Check warning on line 154 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L152-L154

Added lines #L152 - L154 were not covered by tests
}

@Bean
public HbasePutWriterDecorator spanConcurrencyDecorator(@Value("${hbase.client.span-put-writer.concurrency-limit:1000000}") int concurrency) {
return new ConcurrencyDecorator(concurrency);

Check warning on line 159 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L159

Added line #L159 was not covered by tests
}

private HbasePutWriter newPutWriter(AsyncTableFactory asyncTableFactory, HbasePutWriterDecorator decorator) {
HbasePutWriter writer = new AsyncHbasePutWriter(asyncTableFactory);
HbasePutWriter putWriter = decorator.decorator(writer);
return new LoggingHbasePutWriter(putWriter);

Check warning on line 165 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L163-L165

Added lines #L163 - L165 were not covered by tests
}
}


@Bean
@org.springframework.context.annotation.Configuration
@ConditionalOnProperty(name = "hbase.client.put-writer", havingValue = "asyncBufferedMutator", matchIfMissing = true)
public HbasePutWriter batchHbasePutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
BatchAsyncHbasePutWriter writer = new BatchAsyncHbasePutWriter(asyncTableFactory);
HbasePutWriter putWriter = decorator.decorator(writer);
logger.info("HbasePutWriter {}", putWriter);
return new LoggingHbasePutWriter(putWriter);
public static class AsyncBufferedHbasePutWriterConfig {
private final Logger logger = LogManager.getLogger(AsyncBufferedHbasePutWriterConfig.class);

Check warning on line 173 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L173

Added line #L173 was not covered by tests

public AsyncBufferedHbasePutWriterConfig() {
logger.info("Install {}", AsyncBufferedHbasePutWriterConfig.class.getSimpleName());
}

Check warning on line 177 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L175-L177

Added lines #L175 - L177 were not covered by tests

@Primary
@Bean
public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter hbasePutWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("hbasePutWriter {}", hbasePutWriter);
return hbasePutWriter;

Check warning on line 185 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L183-L185

Added lines #L183 - L185 were not covered by tests
}

@Bean
public HbasePutWriterDecorator concurrencyDecorator(@Value("${hbase.client.put-writer.concurrency-limit:100000}") int concurrency) {
return new ConcurrencyDecorator(concurrency);

Check warning on line 190 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L190

Added line #L190 was not covered by tests
}

@Bean
public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter hbasePutWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("HbaseSpanPutWriter {}", hbasePutWriter);
return hbasePutWriter;

Check warning on line 198 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L196-L198

Added lines #L196 - L198 were not covered by tests
}

@Bean
public HbasePutWriterDecorator spanConcurrencyDecorator(@Value("${hbase.client.span-put-writer.concurrency-limit:1000000}") int concurrency) {
return new ConcurrencyDecorator(concurrency);

Check warning on line 203 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L203

Added line #L203 was not covered by tests
}

private HbasePutWriter newPutWriter(AsyncTableFactory asyncTableFactory, HbasePutWriterDecorator decorator) {
HbasePutWriter writer = new BatchAsyncHbasePutWriter(asyncTableFactory);
HbasePutWriter putWriter = decorator.decorator(writer);
return new LoggingHbasePutWriter(putWriter);

Check warning on line 209 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/HbaseTemplateConfiguration.java#L207-L209

Added lines #L207 - L209 were not covered by tests
}
}

@Bean
Expand Down
Loading