Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle other type of truststore #350

Merged
merged 1 commit into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/main/java/com/splunk/hecclient/Hec.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
}

// Code block for custom keystore client construction
SSLContext context = loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStorePassword());
SSLContext context = loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStoreType(), config.getTrustStorePassword());

if (context != null) {
return new HttpClientBuilder()
Expand All @@ -309,16 +309,17 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
* a Hec Client with custom key store functionality.
*
* @param path A file path to the custom key store to be used.
* @param type The type of the key store file.
* @param pass The password for the key store file.
* @since 1.1.0
* @throws HecException
* @return A configured SSLContect to be used in a CloseableHttpClient
* @see KeyStore
* @see SSLContext
*/
public static SSLContext loadCustomSSLContext(String path, String pass) {
public static SSLContext loadCustomSSLContext(String path, String type, String pass) {
try {
KeyStore ks = KeyStore.getInstance("JKS");
KeyStore ks = KeyStore.getInstance(type);
FileInputStream fileInputStream = new FileInputStream(path);
ks.load(fileInputStream, pass.toCharArray());

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/splunk/hecclient/HecConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class HecConfig {
private boolean enableChannelTracking = false;
private boolean hasCustomTrustStore = false;
private String trustStorePath;
private String trustStoreType = "JKS";
private String trustStorePassword;
private int lbPollInterval = 120; // in seconds
private String kerberosPrincipal;
Expand Down Expand Up @@ -104,6 +105,8 @@ public int getBackoffThresholdSeconds() {

public String getTrustStorePath() { return trustStorePath; }

public String getTrustStoreType() { return trustStoreType; }

public String getTrustStorePassword() { return trustStorePassword; }

public HecConfig setDisableSSLCertVerification(boolean disableVerfication) {
Expand Down Expand Up @@ -161,6 +164,11 @@ public HecConfig setTrustStorePath(String path) {
return this;
}

public HecConfig setTrustStoreType(String type) {
trustStoreType = type;
return this;
}

public HecConfig setTrustStorePassword(String pass) {
trustStorePassword = pass;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String HEC_EVENT_FORMATTED_CONF = "splunk.hec.json.event.formatted";
// Trust store
static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path";
static final String SSL_TRUSTSTORE_TYPE_CONF = "splunk.hec.ssl.trust.store.type";
static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password";
//Headers
static final String HEADER_SUPPORT_CONF = "splunk.header.support";
Expand Down Expand Up @@ -178,6 +179,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
+ "correctly by Splunk.";
// TBD
static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store.";
static final String SSL_TRUSTSTORE_TYPE_DOC = "Type of the trust store (JKS, PKCS12, ...).";
static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store.";

static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override";
Expand Down Expand Up @@ -236,6 +238,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {

final boolean hasTrustStorePath;
final String trustStorePath;
final String trustStoreType;
final String trustStorePassword;

final boolean headerSupport;
Expand Down Expand Up @@ -265,6 +268,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF);
trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF);
hasTrustStorePath = StringUtils.isNotBlank(trustStorePath);
trustStoreType = getString(SSL_TRUSTSTORE_TYPE_CONF);
trustStorePassword = getPassword(SSL_TRUSTSTORE_PASSWORD_CONF).value();
validateHttpsConfig(splunkURI);
eventBatchTimeout = getInt(EVENT_TIMEOUT_CONF);
Expand Down Expand Up @@ -318,6 +322,7 @@ public static ConfigDef conf() {
.define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC)
.define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC)
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
.define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC)
.define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC)
.define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC)
.define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC)
Expand Down Expand Up @@ -368,6 +373,7 @@ public HecConfig getHecConfig() {
.setEnableChannelTracking(trackData)
.setBackoffThresholdSeconds(backoffThresholdSeconds)
.setTrustStorePath(trustStorePath)
.setTrustStoreType(trustStoreType)
.setTrustStorePassword(trustStorePassword)
.setHasCustomTrustStore(hasTrustStorePath)
.setKerberosPrincipal(kerberosUserPrincipal)
Expand All @@ -393,6 +399,7 @@ public String toString() {
+ "httpKeepAlive:" + httpKeepAlive + ", "
+ "validateCertificates:" + validateCertificates + ", "
+ "trustStorePath:" + trustStorePath + ", "
+ "trustStoreType:" + trustStoreType + ", "
+ "socketTimeout:" + socketTimeout + ", "
+ "eventBatchTimeout:" + eventBatchTimeout + ", "
+ "ackPollInterval:" + ackPollInterval + ", "
Expand Down Expand Up @@ -544,4 +551,4 @@ private static boolean getNamedGroupCandidates(String regex) {
}
return false;
}
}
}
2 changes: 2 additions & 0 deletions src/test/java/com/splunk/hecclient/HecConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void getterSetter() {
.setEnableChannelTracking(true)
.setEventBatchTimeout(7)
.setTrustStorePath("test")
.setTrustStoreType("PKCS12")
.setTrustStorePassword("pass")
.setHasCustomTrustStore(true)
.setBackoffThresholdSeconds(10)
Expand All @@ -60,6 +61,7 @@ public void getterSetter() {
Assert.assertEquals(6, config.getAckPollThreads());
Assert.assertEquals(7, config.getEventBatchTimeout());
Assert.assertEquals("test", config.getTrustStorePath());
Assert.assertEquals("PKCS12", config.getTrustStoreType());
Assert.assertEquals("pass", config.getTrustStorePassword());
Assert.assertEquals(10000, config.getBackoffThresholdSeconds());
Assert.assertEquals(120000, config.getlbPollInterval());
Expand Down
16 changes: 14 additions & 2 deletions src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,19 @@ public void buildSecureCustomKeystore() {
.setSocketSendBufferSize(1024)
.setSocketTimeout(120)
.setDisableSSLCertVerification(false)
.setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.jks","Notchangeme"))
.setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.jks", "JKS", "Notchangeme"))
.build();
Assert.assertNotNull(client);
}
@Test
public void buildSecureCustomKeystorePkcs12() {
HttpClientBuilder builder = new HttpClientBuilder();
CloseableHttpClient client = builder.setMaxConnectionPoolSizePerDestination(1)
.setMaxConnectionPoolSize(2)
.setSocketSendBufferSize(1024)
.setSocketTimeout(120)
.setDisableSSLCertVerification(false)
.setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.p12", "PKCS12", "Notchangeme"))
.build();
Assert.assertNotNull(client);
}
Expand All @@ -63,4 +75,4 @@ public void buildDefault() {
CloseableHttpClient client = builder.build();
Assert.assertNotNull(client);
}
}
}
15 changes: 13 additions & 2 deletions src/test/java/com/splunk/kafka/connect/ConfigProfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ConfigProfile {
private boolean validateCertificates;
private boolean hasTrustStorePath;
private String trustStorePath;
private String trustStoreType;
private String trustStorePassword;
private int eventBatchTimeout;
private int ackPollInterval;
Expand Down Expand Up @@ -77,6 +78,7 @@ public ConfigProfile buildProfileDefault() {
this.validateCertificates = true;
this.hasTrustStorePath = true;
this.trustStorePath = "./src/test/resources/keystoretest.jks";
this.trustStoreType = "JKS";
this.trustStorePassword = "Notchangeme";
this.eventBatchTimeout = 1;
this.ackPollInterval = 1;
Expand Down Expand Up @@ -110,7 +112,8 @@ public ConfigProfile buildProfileOne() {
this.httpKeepAlive = true;
this.validateCertificates = true;
this.hasTrustStorePath = true;
this.trustStorePath = "./src/test/resources/keystoretest.jks";
this.trustStorePath = "./src/test/resources/keystoretest.p12";
this.trustStoreType = "PKCS12";
this.trustStorePassword = "Notchangeme";
this.eventBatchTimeout = 1;
this.ackPollInterval = 1;
Expand Down Expand Up @@ -332,6 +335,14 @@ public void setTrustStorePath(String trustStorePath) {
this.trustStorePath = trustStorePath;
}

public String getTrustStoreType() {
return trustStoreType;
}

public void setTrustStoreType(String trustStoreType) {
this.trustStoreType = trustStoreType;
}

public String getTrustStorePassword() {
return trustStorePassword;
}
Expand Down Expand Up @@ -461,6 +472,6 @@ public void setHeaderHost(String headerHost) {
}

@Override public String toString() {
return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}';
return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStoreType='" + trustStoreType + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void getHecConfigCustomKeystore() {
HecConfig config = connectorConfig.getHecConfig();
Assert.assertEquals(true, config.getHasCustomTrustStore());
Assert.assertEquals(uu.configProfile.getTrustStorePath(), config.getTrustStorePath());
Assert.assertEquals(uu.configProfile.getTrustStoreType(), config.getTrustStoreType());
Assert.assertEquals(uu.configProfile.getTrustStorePassword(), config.getTrustStorePassword());
}

Expand All @@ -95,9 +96,10 @@ public void testCustomKeystore() throws KeyStoreException {
HecConfig config = connectorConfig.getHecConfig();
Assert.assertEquals(true, config.getHasCustomTrustStore());
Assert.assertEquals(uu.configProfile.getTrustStorePath(), config.getTrustStorePath());
Assert.assertEquals(uu.configProfile.getTrustStoreType(), config.getTrustStoreType());
Assert.assertEquals(uu.configProfile.getTrustStorePassword(), config.getTrustStorePassword());

SSLContext context = Hec.loadCustomSSLContext(config.getTrustStorePath(),config.getTrustStorePassword());
SSLContext context = Hec.loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStoreType(), config.getTrustStorePassword());
Assert.assertNotNull(context);

}
Expand Down Expand Up @@ -315,6 +317,7 @@ private void commonAssert(final SplunkSinkConnectorConfig connectorConfig) {
Assert.assertEquals(uu.configProfile.isHttpKeepAlive(), connectorConfig.httpKeepAlive);
Assert.assertEquals(uu.configProfile.isValidateCertificates(), connectorConfig.validateCertificates);
Assert.assertEquals(uu.configProfile.getTrustStorePath(), connectorConfig.trustStorePath);
Assert.assertEquals(uu.configProfile.getTrustStoreType(), connectorConfig.trustStoreType);
Assert.assertEquals(uu.configProfile.getTrustStorePassword(), connectorConfig.trustStorePassword);
Assert.assertEquals(uu.configProfile.getEventBatchTimeout(), connectorConfig.eventBatchTimeout);
Assert.assertEquals(uu.configProfile.getAckPollInterval(), connectorConfig.ackPollInterval);
Expand Down
1 change: 1 addition & 0 deletions src/test/java/com/splunk/kafka/connect/UnitUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public Map<String, String> createTaskConfig() {

if(configProfile.getTrustStorePath() != null ) {
config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_PATH_CONF, configProfile.getTrustStorePath());
config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_TYPE_CONF, configProfile.getTrustStoreType());
config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_PASSWORD_CONF, configProfile.getTrustStorePassword());
}

Expand Down
Binary file added src/test/resources/keystoretest.p12
Binary file not shown.