Skip to content

Commit

Permalink
Merge pull request #377 from vihas-splunk/make-validations-configurablee
Browse files Browse the repository at this point in the history
Add a new config parameter for disabling splunk validations
  • Loading branch information
VihasMakwana authored Feb 2, 2023
2 parents 63c3545 + a8e42bb commit d8f9584
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Use the below schema to configure Splunk Connect for Kafka
| `splunk.sources` | Splunk event source metadata for Kafka topic data. The same configuration rules as indexes can be applied. If left unconfigured, the default source binds to the HEC token. | `""` |
| `splunk.sourcetypes` | Splunk event sourcetype metadata for Kafka topic data. The same configuration rules as indexes can be applied here. If left unconfigured, the default sourcetype binds to the HEC token. | `""` |
| `splunk.flush.window` | The interval in seconds at which the events from kafka connect will be flushed to Splunk. | `30` |
| `splunk.validation.disable` | Disable validating splunk configurations before creating task. | `false` |
| `splunk.hec.ssl.validate.certs` | Valid settings are `true` or `false`. Enables or disables HTTPS certification validation. |`true`|
| `splunk.hec.http.keepalive` | Valid settings are `true` or `false`. Enables or disables HTTP connection keep-alive. |`true`|
| `splunk.hec.max.http.connection.per.channel` | Controls how many HTTP connections will be created and cached in the HTTP pool for one HEC channel. |`2`|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ private static String[] split(String data, String sep) {

private void validateSplunkConfigurations(final Map<String, String> configs) throws ConfigException {
SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(configs);
if (connectorConfig.disableValidation) {
return;
}
String[] indexes = split(connectorConfig.indexes, ",");
if(indexes == null || indexes.length == 0) {
preparePayloadAndExecuteRequest(connectorConfig, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String SOURCE_CONF = "splunk.sources";
static final String SOURCETYPE_CONF = "splunk.sourcetypes";
static final String FLUSH_WINDOW_CONF = "splunk.flush.window";
static final String DISABLE_VALIDATION = "splunk.validation.disable";
static final String TOTAL_HEC_CHANNEL_CONF = "splunk.hec.total.channels";
static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel";
static final String MAX_BATCH_SIZE_CONF = "splunk.hec.max.batch.size"; // record count
Expand Down Expand Up @@ -128,6 +129,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String SSL_VALIDATE_CERTIFICATES_DOC = "Valid settings are true or false. Enables or disables HTTPS "
+ "certification validation. By default, this is set to true.";
static final String ENABLE_COMPRESSSION_DOC = "Valid settings are true or false. Used for enable or disable gzip-compression. By default, this is set to false.";
static final String DISABLE_VALIDATION_DOC = "Disable validating splunk configurations before creating task.";
// Acknowledgement Parameters
// Use Ack
static final String ACK_DOC = "Valid settings are true or false. When set to true Splunk Connect for Kafka will "
Expand Down Expand Up @@ -218,6 +220,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final int socketTimeout;
final boolean validateCertificates;
final boolean enableCompression;
final boolean disableValidation;
final int lbPollInterval;

final boolean ack;
Expand Down Expand Up @@ -304,6 +307,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
kerberosUserPrincipal = getString(KERBEROS_USER_PRINCIPAL_CONF);
kerberosKeytabPath = getString(KERBEROS_KEYTAB_PATH_CONF);
enableCompression = getBoolean(ENABLE_COMPRESSSION_CONF);
disableValidation = getBoolean(DISABLE_VALIDATION);
enableTimestampExtraction = getBoolean(ENABLE_TIMESTAMP_EXTRACTION_CONF);
regex = getString(REGEX_CONF);
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
Expand Down Expand Up @@ -351,6 +355,7 @@ public static ConfigDef conf() {
.define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC)
.define(LB_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 120, ConfigDef.Importance.LOW, LB_POLL_INTERVAL_DOC)
.define(ENABLE_COMPRESSSION_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, ENABLE_COMPRESSSION_DOC)
.define(DISABLE_VALIDATION, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, DISABLE_VALIDATION_DOC)
.define(KERBEROS_USER_PRINCIPAL_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_USER_PRINCIPAL_DOC)
.define(KERBEROS_KEYTAB_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC)
.define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC)
Expand Down Expand Up @@ -425,6 +430,7 @@ public String toString() {
+ "headerSourcetype:" + headerSourcetype + ", "
+ "headerHost:" + headerHost + ", "
+ "enableCompression:" + enableCompression + ", "
+ "disableValidation:" + disableValidation + ", "
+ "lbPollInterval:" + lbPollInterval;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,32 @@ public void testValidSplunkConfigurations() {
Assertions.assertDoesNotThrow(()->connector.validate(configs));
}

@Test
public void testInvalidSplunkConfigurationsWithValidationDisabled() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("splunk.validation.disable", "true");
configs.put("topics", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.exception);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertDoesNotThrow(()->connector.validate(configs));
}

@Test
public void testInvalidSplunkConfigurationsWithValidationEnabled() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("splunk.validation.disable", "false");
configs.put("topics", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.exception);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

private void addNecessaryConfigs(Map<String, String> configs) {
configs.put(URI_CONF, TEST_URI);
configs.put(TOKEN_CONF, "blah");
Expand Down

0 comments on commit d8f9584

Please sign in to comment.