From 35e2f64a5083327142c6c13bd801daa8743094d9 Mon Sep 17 00:00:00 2001 From: xuthus5 Date: Fri, 18 Aug 2023 20:10:19 +0800 Subject: [PATCH] feat: pulsar producer support configure auth token --- .../perftool/mq/producer/pulsar/PulsarConfig.java | 9 +++++++++ .../mq/producer/pulsar/PulsarSendThread.java | 12 ++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/perf-mq-producer-pulsar/src/main/java/com/github/perftool/mq/producer/pulsar/PulsarConfig.java b/perf-mq-producer-pulsar/src/main/java/com/github/perftool/mq/producer/pulsar/PulsarConfig.java index cf1386a..b5910ac 100644 --- a/perf-mq-producer-pulsar/src/main/java/com/github/perftool/mq/producer/pulsar/PulsarConfig.java +++ b/perf-mq-producer-pulsar/src/main/java/com/github/perftool/mq/producer/pulsar/PulsarConfig.java @@ -114,6 +114,15 @@ public class PulsarConfig { @Value("${PULSAR_TLS_TRUSTSTORE_PASSWORD:}") public String tlsTrustStorePassword; + @Value("${PULSAR_AUTH_TOKEN_ENABLE:false}") + public boolean authTokenEnable; + + @Value("${PULSAR_AUTH_TOKEN:}") + public String authToken; + + @Value("${PULSAR_SECURE_CONNECTION_ENABLE:false}") + public boolean secureConnectionEnable; + @Value("${TRACE_REPORT_SCENE:default}") public String traceReportScene; diff --git a/perf-mq-producer-pulsar/src/main/java/com/github/perftool/mq/producer/pulsar/PulsarSendThread.java b/perf-mq-producer-pulsar/src/main/java/com/github/perftool/mq/producer/pulsar/PulsarSendThread.java index 320493c..1b4c88d 100644 --- a/perf-mq-producer-pulsar/src/main/java/com/github/perftool/mq/producer/pulsar/PulsarSendThread.java +++ b/perf-mq-producer-pulsar/src/main/java/com/github/perftool/mq/producer/pulsar/PulsarSendThread.java @@ -33,6 +33,7 @@ import io.github.perftool.trace.util.InboundCounter; import io.github.perftool.trace.util.JacksonUtil; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -82,14 +83,21 @@ public void init() throws Exception { .memoryLimit(pulsarConfig.memoryLimitMb, SizeUnit.MEGA_BYTES) .ioThreads(pulsarConfig.pulsarIoThreads) .maxConcurrentLookupRequests(pulsarConfig.pulsarMaxConcurrentLookupRequests); + if (pulsarConfig.authTokenEnable) { + clientBuilder.authentication(AuthenticationFactory.token(pulsarConfig.authToken)); + } + if (!pulsarConfig.secureConnectionEnable) { + clientBuilder.allowTlsInsecureConnection(true) + .enableTlsHostnameVerification(false); + } if (pulsarConfig.tlsEnable) { Map map = new HashMap<>(); map.put("keyStoreType", "JKS"); map.put("keyStorePath", pulsarConfig.keyStorePath); map.put("keyStorePassword", pulsarConfig.keyStorePassword); - pulsarClient = clientBuilder.allowTlsInsecureConnection(true).serviceUrl(String.format("%s://%s:%s", + pulsarClient = clientBuilder.serviceUrl(String.format("%s://%s:%s", pulsarConfig.protocol, pulsarConfig.host, pulsarConfig.port)) - .enableTlsHostnameVerification(false).useKeyStoreTls(true) + .useKeyStoreTls(true) .tlsTrustStoreType("JKS") .tlsTrustStorePath(pulsarConfig.tlsTrustStorePath) .tlsTrustStorePassword(pulsarConfig.tlsTrustStorePassword)