diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java index 802804ba..1a965bd3 100644 --- a/src/main/java/com/splunk/hecclient/Hec.java +++ b/src/main/java/com/splunk/hecclient/Hec.java @@ -287,7 +287,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) { } // Code block for custom keystore client construction - SSLContext context = loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStorePassword()); + SSLContext context = loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStoreType(), config.getTrustStorePassword()); if (context != null) { return new HttpClientBuilder() @@ -309,6 +309,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) { * a Hec Client with custom key store functionality. * * @param path A file path to the custom key store to be used. + * @param type The type of the key store file. * @param pass The password for the key store file. * @since 1.1.0 * @throws HecException @@ -316,9 +317,9 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) { * @see KeyStore * @see SSLContext */ - public static SSLContext loadCustomSSLContext(String path, String pass) { + public static SSLContext loadCustomSSLContext(String path, String type, String pass) { try { - KeyStore ks = KeyStore.getInstance("JKS"); + KeyStore ks = KeyStore.getInstance(type); FileInputStream fileInputStream = new FileInputStream(path); ks.load(fileInputStream, pass.toCharArray()); diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java index d6421603..d29eccb5 100644 --- a/src/main/java/com/splunk/hecclient/HecConfig.java +++ b/src/main/java/com/splunk/hecclient/HecConfig.java @@ -34,6 +34,7 @@ public final class HecConfig { private boolean enableChannelTracking = false; private boolean hasCustomTrustStore = false; private String trustStorePath; + private String trustStoreType = "JKS"; private String trustStorePassword; private int lbPollInterval = 120; // in seconds private String kerberosPrincipal; @@ -104,6 +105,8 @@ public int getBackoffThresholdSeconds() { public String getTrustStorePath() { return trustStorePath; } + public String getTrustStoreType() { return trustStoreType; } + public String getTrustStorePassword() { return trustStorePassword; } public HecConfig setDisableSSLCertVerification(boolean disableVerfication) { @@ -161,6 +164,11 @@ public HecConfig setTrustStorePath(String path) { return this; } + public HecConfig setTrustStoreType(String type) { + trustStoreType = type; + return this; + } + public HecConfig setTrustStorePassword(String pass) { trustStorePassword = pass; return this; diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 937c2bea..dddd3291 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -74,6 +74,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String HEC_EVENT_FORMATTED_CONF = "splunk.hec.json.event.formatted"; // Trust store static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path"; + static final String SSL_TRUSTSTORE_TYPE_CONF = "splunk.hec.ssl.trust.store.type"; static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password"; //Headers static final String HEADER_SUPPORT_CONF = "splunk.header.support"; @@ -178,6 +179,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { + "correctly by Splunk."; // TBD static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store."; + static final String SSL_TRUSTSTORE_TYPE_DOC = "Type of the trust store (JKS, PKCS12, ...)."; static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store."; static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override"; @@ -236,6 +238,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final boolean hasTrustStorePath; final String trustStorePath; + final String trustStoreType; final String trustStorePassword; final boolean headerSupport; @@ -265,6 +268,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF); trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF); hasTrustStorePath = StringUtils.isNotBlank(trustStorePath); + trustStoreType = getString(SSL_TRUSTSTORE_TYPE_CONF); trustStorePassword = getPassword(SSL_TRUSTSTORE_PASSWORD_CONF).value(); validateHttpsConfig(splunkURI); eventBatchTimeout = getInt(EVENT_TIMEOUT_CONF); @@ -318,6 +322,7 @@ public static ConfigDef conf() { .define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC) .define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC) .define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC) + .define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC) .define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC) .define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC) .define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC) @@ -368,6 +373,7 @@ public HecConfig getHecConfig() { .setEnableChannelTracking(trackData) .setBackoffThresholdSeconds(backoffThresholdSeconds) .setTrustStorePath(trustStorePath) + .setTrustStoreType(trustStoreType) .setTrustStorePassword(trustStorePassword) .setHasCustomTrustStore(hasTrustStorePath) .setKerberosPrincipal(kerberosUserPrincipal) @@ -393,6 +399,7 @@ public String toString() { + "httpKeepAlive:" + httpKeepAlive + ", " + "validateCertificates:" + validateCertificates + ", " + "trustStorePath:" + trustStorePath + ", " + + "trustStoreType:" + trustStoreType + ", " + "socketTimeout:" + socketTimeout + ", " + "eventBatchTimeout:" + eventBatchTimeout + ", " + "ackPollInterval:" + ackPollInterval + ", " @@ -544,4 +551,4 @@ private static boolean getNamedGroupCandidates(String regex) { } return false; } -} \ No newline at end of file +} diff --git a/src/test/java/com/splunk/hecclient/HecConfigTest.java b/src/test/java/com/splunk/hecclient/HecConfigTest.java index a43d9054..3826bff4 100644 --- a/src/test/java/com/splunk/hecclient/HecConfigTest.java +++ b/src/test/java/com/splunk/hecclient/HecConfigTest.java @@ -44,6 +44,7 @@ public void getterSetter() { .setEnableChannelTracking(true) .setEventBatchTimeout(7) .setTrustStorePath("test") + .setTrustStoreType("PKCS12") .setTrustStorePassword("pass") .setHasCustomTrustStore(true) .setBackoffThresholdSeconds(10) @@ -60,6 +61,7 @@ public void getterSetter() { Assert.assertEquals(6, config.getAckPollThreads()); Assert.assertEquals(7, config.getEventBatchTimeout()); Assert.assertEquals("test", config.getTrustStorePath()); + Assert.assertEquals("PKCS12", config.getTrustStoreType()); Assert.assertEquals("pass", config.getTrustStorePassword()); Assert.assertEquals(10000, config.getBackoffThresholdSeconds()); Assert.assertEquals(120000, config.getlbPollInterval()); diff --git a/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java b/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java index 3d5fbfff..80490f4f 100644 --- a/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java +++ b/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java @@ -52,7 +52,19 @@ public void buildSecureCustomKeystore() { .setSocketSendBufferSize(1024) .setSocketTimeout(120) .setDisableSSLCertVerification(false) - .setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.jks","Notchangeme")) + .setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.jks", "JKS", "Notchangeme")) + .build(); + Assert.assertNotNull(client); + } + @Test + public void buildSecureCustomKeystorePkcs12() { + HttpClientBuilder builder = new HttpClientBuilder(); + CloseableHttpClient client = builder.setMaxConnectionPoolSizePerDestination(1) + .setMaxConnectionPoolSize(2) + .setSocketSendBufferSize(1024) + .setSocketTimeout(120) + .setDisableSSLCertVerification(false) + .setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.p12", "PKCS12", "Notchangeme")) .build(); Assert.assertNotNull(client); } @@ -63,4 +75,4 @@ public void buildDefault() { CloseableHttpClient client = builder.build(); Assert.assertNotNull(client); } -} \ No newline at end of file +} diff --git a/src/test/java/com/splunk/kafka/connect/ConfigProfile.java b/src/test/java/com/splunk/kafka/connect/ConfigProfile.java index b8085d4e..f6c75b9f 100644 --- a/src/test/java/com/splunk/kafka/connect/ConfigProfile.java +++ b/src/test/java/com/splunk/kafka/connect/ConfigProfile.java @@ -17,6 +17,7 @@ public class ConfigProfile { private boolean validateCertificates; private boolean hasTrustStorePath; private String trustStorePath; + private String trustStoreType; private String trustStorePassword; private int eventBatchTimeout; private int ackPollInterval; @@ -77,6 +78,7 @@ public ConfigProfile buildProfileDefault() { this.validateCertificates = true; this.hasTrustStorePath = true; this.trustStorePath = "./src/test/resources/keystoretest.jks"; + this.trustStoreType = "JKS"; this.trustStorePassword = "Notchangeme"; this.eventBatchTimeout = 1; this.ackPollInterval = 1; @@ -110,7 +112,8 @@ public ConfigProfile buildProfileOne() { this.httpKeepAlive = true; this.validateCertificates = true; this.hasTrustStorePath = true; - this.trustStorePath = "./src/test/resources/keystoretest.jks"; + this.trustStorePath = "./src/test/resources/keystoretest.p12"; + this.trustStoreType = "PKCS12"; this.trustStorePassword = "Notchangeme"; this.eventBatchTimeout = 1; this.ackPollInterval = 1; @@ -332,6 +335,14 @@ public void setTrustStorePath(String trustStorePath) { this.trustStorePath = trustStorePath; } + public String getTrustStoreType() { + return trustStoreType; + } + + public void setTrustStoreType(String trustStoreType) { + this.trustStoreType = trustStoreType; + } + public String getTrustStorePassword() { return trustStorePassword; } @@ -461,6 +472,6 @@ public void setHeaderHost(String headerHost) { } @Override public String toString() { - return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}'; + return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStoreType='" + trustStoreType + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}'; } } diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java index 82f1c997..2113fade 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java @@ -83,6 +83,7 @@ public void getHecConfigCustomKeystore() { HecConfig config = connectorConfig.getHecConfig(); Assert.assertEquals(true, config.getHasCustomTrustStore()); Assert.assertEquals(uu.configProfile.getTrustStorePath(), config.getTrustStorePath()); + Assert.assertEquals(uu.configProfile.getTrustStoreType(), config.getTrustStoreType()); Assert.assertEquals(uu.configProfile.getTrustStorePassword(), config.getTrustStorePassword()); } @@ -95,9 +96,10 @@ public void testCustomKeystore() throws KeyStoreException { HecConfig config = connectorConfig.getHecConfig(); Assert.assertEquals(true, config.getHasCustomTrustStore()); Assert.assertEquals(uu.configProfile.getTrustStorePath(), config.getTrustStorePath()); + Assert.assertEquals(uu.configProfile.getTrustStoreType(), config.getTrustStoreType()); Assert.assertEquals(uu.configProfile.getTrustStorePassword(), config.getTrustStorePassword()); - SSLContext context = Hec.loadCustomSSLContext(config.getTrustStorePath(),config.getTrustStorePassword()); + SSLContext context = Hec.loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStoreType(), config.getTrustStorePassword()); Assert.assertNotNull(context); } @@ -315,6 +317,7 @@ private void commonAssert(final SplunkSinkConnectorConfig connectorConfig) { Assert.assertEquals(uu.configProfile.isHttpKeepAlive(), connectorConfig.httpKeepAlive); Assert.assertEquals(uu.configProfile.isValidateCertificates(), connectorConfig.validateCertificates); Assert.assertEquals(uu.configProfile.getTrustStorePath(), connectorConfig.trustStorePath); + Assert.assertEquals(uu.configProfile.getTrustStoreType(), connectorConfig.trustStoreType); Assert.assertEquals(uu.configProfile.getTrustStorePassword(), connectorConfig.trustStorePassword); Assert.assertEquals(uu.configProfile.getEventBatchTimeout(), connectorConfig.eventBatchTimeout); Assert.assertEquals(uu.configProfile.getAckPollInterval(), connectorConfig.ackPollInterval); diff --git a/src/test/java/com/splunk/kafka/connect/UnitUtil.java b/src/test/java/com/splunk/kafka/connect/UnitUtil.java index 1c2b3296..85ae6e04 100644 --- a/src/test/java/com/splunk/kafka/connect/UnitUtil.java +++ b/src/test/java/com/splunk/kafka/connect/UnitUtil.java @@ -45,6 +45,7 @@ public Map createTaskConfig() { if(configProfile.getTrustStorePath() != null ) { config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_PATH_CONF, configProfile.getTrustStorePath()); + config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_TYPE_CONF, configProfile.getTrustStoreType()); config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_PASSWORD_CONF, configProfile.getTrustStorePassword()); } diff --git a/src/test/resources/keystoretest.p12 b/src/test/resources/keystoretest.p12 new file mode 100644 index 00000000..5aa24124 Binary files /dev/null and b/src/test/resources/keystoretest.p12 differ