From 17cfe1a53b14e8b8b54356ea59f28bc35ee5e223 Mon Sep 17 00:00:00 2001 From: Enrico Mano Date: Mon, 15 Mar 2021 09:50:04 +0100 Subject: [PATCH 1/4] Make the geolocation field label customizable from config file --- .../elasticsearch/ElasticsearchClientService.java | 8 +++++++- .../ElasticsearchRecordConverter.java | 14 +++++++++++++- .../Elasticsearch_7_x_ClientService.java | 8 +++++++- .../engine/spark/KafkaStreamProcessingEngine.scala | 5 +---- 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java index 29d1692b1..f29ab2dc1 100644 --- a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java +++ b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java @@ -204,7 +204,13 @@ public ValidationResult validate(final String subject, final String input) { .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); - + PropertyDescriptor GEOLOCATION_FIELD_LABEL = new PropertyDescriptor.Builder() + .name("geolocation.output.field.label") + .description("Label used to name the output record field for geolocation properties") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); /** * Put a given document in elasticsearch bulk processor. diff --git a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java index 46133b998..dafb120f0 100644 --- a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java +++ b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java @@ -41,6 +41,18 @@ class ElasticsearchRecordConverter { * @return the json converted record */ static String convertToString(Record record) { + return convertToString(record, new String("location")); + } + + /** + * Converts an Event into an Elasticsearch document + * to be indexed later + *e + * @param record to convert + * @param geolocationLabel is the label for the geolocation field + * @return the json converted record + */ + static String convertToString(Record record, String geolocationLabel) { logger.trace(record.toString()); try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); @@ -117,7 +129,7 @@ static String convertToString(Record record) { if((geolocation[0] != 0) && (geolocation[1] != 0)) { - document.latlon("geolocation", geolocation[0], geolocation[1]); + document.latlon(geolocationLabel, geolocation[0], geolocation[1]); } diff --git a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/Elasticsearch_7_x_ClientService.java b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/Elasticsearch_7_x_ClientService.java index 298e07387..ceb7a11ec 100644 --- a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/Elasticsearch_7_x_ClientService.java +++ b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/Elasticsearch_7_x_ClientService.java @@ -108,6 +108,7 @@ public class Elasticsearch_7_x_ClientService extends AbstractControllerService i private volatile String authToken; protected volatile transient BulkProcessor bulkProcessor; protected volatile Map errors = new HashMap<>(); + private String geolocationFieldLabel; @Override public List getSupportedPropertyDescriptors() { @@ -129,6 +130,7 @@ public List getSupportedPropertyDescriptors() { props.add(HOSTS); props.add(PROP_SSL_CONTEXT_SERVICE); props.add(CHARSET); + props.add(GEOLOCATION_FIELD_LABEL); return Collections.unmodifiableList(props); } @@ -162,6 +164,7 @@ protected void createElasticsearchClient(ControllerServiceInitializationContext final String password = context.getPropertyValue(PASSWORD).asString(); final String hosts = context.getPropertyValue(HOSTS).asString(); final boolean enableSsl = context.getPropertyValue(ENABLE_SSL).asBoolean(); + geolocationFieldLabel = context.getPropertyValue(GEOLOCATION_FIELD_LABEL).asString(); esHosts = getEsHosts(hosts, enableSsl); @@ -799,7 +802,7 @@ public void bulkPut(String indexName, Record record) throws DatastoreClientServi } - bulkPut(indexName, null, ElasticsearchRecordConverter.convertToString(record), Optional.of(record.getId())); + bulkPut(indexName, null, convertRecordToString(record), Optional.of(record.getId())); } @Override @@ -829,6 +832,9 @@ public long queryCount(String query) { @Override public String convertRecordToString(Record record) { + if (geolocationFieldLabel != null) { + return ElasticsearchRecordConverter.convertToString(record, geolocationFieldLabel); + } return ElasticsearchRecordConverter.convertToString(record); } diff --git a/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala b/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala index bac9b10d3..124701adf 100644 --- a/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala +++ b/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala @@ -84,7 +84,7 @@ object KafkaStreamProcessingEngine { "local(\\[([0-9]+|\\*)(,[0-9]+)?\\])?|" + "spark:\\/\\/[a-z0-9\\.\\-]+(:[0-9]+)?(,[a-z0-9\\.\\-]+(:[0-9]+)?)*|" + "mesos:\\/\\/((zk:\\/\\/[a-z0-9\\.\\-]+:[0-9]+(,[a-z0-9\\.\\-]+:[0-9]+)*\\/mesos)|(([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+|[a-z][a-z0-9\\.\\-]+)(:[0-9]+)?))|" + - "k8s://.+)$"))) + "k8s:\\/\\/.+)$"))) .defaultValue("local[2]") .build @@ -446,9 +446,6 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine { UserMetricsSystem.initialize(sparkContext, "LogislandMetrics") - - - /** * shutdown context gracefully */ From af7a6141a580f2e177f072b9b9c2a0a53bb7c9ee Mon Sep 17 00:00:00 2001 From: Enrico Mano Date: Mon, 15 Mar 2021 14:11:52 +0100 Subject: [PATCH 2/4] Add configuration parameters to allow spark-submit on k8s --- .../ElasticsearchClientService.java | 2 +- .../spark/KafkaStreamProcessingEngine.scala | 42 ++++++++++++++----- .../logisland-utils/pom.xml | 11 +---- logisland-docker/spark8s/Dockerfile | 2 +- 4 files changed, 35 insertions(+), 22 deletions(-) diff --git a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java index f29ab2dc1..95d8a8b3f 100644 --- a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java +++ b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java @@ -205,7 +205,7 @@ public ValidationResult validate(final String subject, final String input) { .build(); PropertyDescriptor GEOLOCATION_FIELD_LABEL = new PropertyDescriptor.Builder() - .name("geolocation.output.field.label") + .name("geolocation.output.field.name") .description("Label used to name the output record field for geolocation properties") .required(false) .sensitive(true) diff --git a/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala b/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala index 124701adf..b8c5d1450 100644 --- a/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala +++ b/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala @@ -375,6 +375,19 @@ object KafkaStreamProcessingEngine { .required(false) .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build + + val OVERWRITE_EXISTING = new AllowableValue("overwrite_existing", "overwrite existing field", "if field already exist") + + val KEEP_OLD_FIELD = new AllowableValue("keep_old_field", "keep only old field value", "keep only old field") + + val SPARK_CONF_POLICY = new PropertyDescriptor.Builder() + .name("spark.conf.properties.policy") + .description("What to do when a field with the same name already exists ?") + .required(false) + .defaultValue(OVERWRITE_EXISTING.getValue()) + .allowableValues(OVERWRITE_EXISTING, KEEP_OLD_FIELD) + .build(); + } class KafkaStreamProcessingEngine extends AbstractProcessingEngine { @@ -392,24 +405,27 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine { override def init(context: EngineContext): Unit = { super.init(context) val engineContext = context.asInstanceOf[EngineContext] - val sparkMaster = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_MASTER).asString - val appName = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_APP_NAME).asString batchDurationMs = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_STREAMING_BATCH_DURATION).asInteger().intValue() sparkStreamingTimeout = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_STREAMING_TIMEOUT).asInteger().toInt - /** - * job configuration - */ - conf.setAppName(appName) - conf.setMaster(sparkMaster) + val conflictPolicy = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_CONF_POLICY).asString def setConfProperty(conf: SparkConf, engineContext: EngineContext, propertyDescriptor: PropertyDescriptor) = { - // Need to check if the properties are set because those properties are not "requires" if (engineContext.getPropertyValue(propertyDescriptor).isSet) { - conf.set(propertyDescriptor.getName, engineContext.getPropertyValue(propertyDescriptor).asString) + if (conf.contains(propertyDescriptor.getName)) { + logger.warn("Property " + propertyDescriptor.getName + " already present in the sparkConfiguration with value " + conf.get(propertyDescriptor.getName)) + } + + if (conflictPolicy.equals(KafkaStreamProcessingEngine.OVERWRITE_EXISTING.getValue)) { + conf.set(propertyDescriptor.getName, engineContext.getPropertyValue(propertyDescriptor).asString) + logger.info("Property " + propertyDescriptor.getName + " set in the sparkConfiguration with value " + conf.get(propertyDescriptor.getName)) + } } } + setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_APP_NAME) + setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_MASTER) + setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_STREAMING_UI_RETAINED_BATCHES) setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_STREAMING_RECEIVER_WAL_ENABLE) setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_STREAMING_KAFKA_MAXRETRIES) @@ -436,12 +452,15 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine { conf.set("spark.kryo.registrator", "com.hurence.logisland.util.spark.ProtoBufRegistrator") - if (sparkMaster startsWith "yarn") { + if (conf.get(KafkaStreamProcessingEngine.SPARK_MASTER.getName) startsWith "yarn") { // Note that SPARK_YARN_DEPLOYMODE is not used by spark itself but only by spark-submit CLI // That's why we do not need to propagate it here setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_YARN_QUEUE) } + logger.info("Configuration from logisland main") + logger.info(conf.toDebugString) + val sparkContext = getCurrentSparkContext() UserMetricsSystem.initialize(sparkContext, "LogislandMetrics") @@ -492,6 +511,8 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine { } }) + val sparkMaster = conf.get(KafkaStreamProcessingEngine.SPARK_MASTER.getName) + val appName = conf.get(KafkaStreamProcessingEngine.SPARK_APP_NAME.getName) logger.info(s"spark context initialized with master:$sparkMaster, " + s"appName:$appName, " + s"batchDuration:$batchDurationMs ") @@ -535,6 +556,7 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine { descriptors.add(KafkaStreamProcessingEngine.SPARK_MESOS_CORE_MAX) descriptors.add(KafkaStreamProcessingEngine.SPARK_TOTAL_EXECUTOR_CORES) descriptors.add(KafkaStreamProcessingEngine.SPARK_SUPERVISE) + descriptors.add(KafkaStreamProcessingEngine.SPARK_CONF_POLICY) Collections.unmodifiableList(descriptors) } diff --git a/logisland-core/logisland-framework/logisland-utils/pom.xml b/logisland-core/logisland-framework/logisland-utils/pom.xml index 65e812d4b..d738c9a23 100644 --- a/logisland-core/logisland-framework/logisland-utils/pom.xml +++ b/logisland-core/logisland-framework/logisland-utils/pom.xml @@ -156,6 +156,7 @@ org.apache.avro:avro + io.confluent @@ -183,16 +184,6 @@ - - diff --git a/logisland-docker/spark8s/Dockerfile b/logisland-docker/spark8s/Dockerfile index 042c6c38a..1793cbc94 100644 --- a/logisland-docker/spark8s/Dockerfile +++ b/logisland-docker/spark8s/Dockerfile @@ -1,4 +1,4 @@ -FROM gcr.io/spark-operator/spark:v2.4.0 +FROM gcr.io/spark-operator/spark:v2.4.5 LABEL maintainer="support@hurence.com" From 22a433ab75df3ef948d8a0d765964c19b44b354b Mon Sep 17 00:00:00 2001 From: Enrico Mano Date: Mon, 15 Mar 2021 14:40:04 +0100 Subject: [PATCH 3/4] Remove sensitive parameter for geolocation field name --- .../service/elasticsearch/ElasticsearchClientService.java | 1 - .../service/elasticsearch/ElasticsearchRecordConverter.java | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java index 95d8a8b3f..747d9dd65 100644 --- a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java +++ b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java @@ -208,7 +208,6 @@ public ValidationResult validate(final String subject, final String input) { .name("geolocation.output.field.name") .description("Label used to name the output record field for geolocation properties") .required(false) - .sensitive(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); diff --git a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java index dafb120f0..8b65d0319 100644 --- a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java +++ b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java @@ -49,10 +49,10 @@ static String convertToString(Record record) { * to be indexed later *e * @param record to convert - * @param geolocationLabel is the label for the geolocation field + * @param geolocationFieldLabel is the label for the geolocation field * @return the json converted record */ - static String convertToString(Record record, String geolocationLabel) { + static String convertToString(Record record, String geolocationFieldLabel) { logger.trace(record.toString()); try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); @@ -129,7 +129,7 @@ static String convertToString(Record record, String geolocationLabel) { if((geolocation[0] != 0) && (geolocation[1] != 0)) { - document.latlon(geolocationLabel, geolocation[0], geolocation[1]); + document.latlon(geolocationFieldLabel, geolocation[0], geolocation[1]); } From 099f553bd1aa25965a853180ce94f39d329c32ba Mon Sep 17 00:00:00 2001 From: Enrico Mano Date: Mon, 22 Mar 2021 09:46:13 +0100 Subject: [PATCH 4/4] Fix a performance issue inefficient String constructor https://github.com/Hurence/logisland/pull/574/files/22a433ab75df3ef948d8a0d765964c19b44b354b#r598279100 --- .../service/elasticsearch/ElasticsearchRecordConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java index 8b65d0319..c2a7ba43f 100644 --- a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java +++ b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java @@ -41,7 +41,7 @@ class ElasticsearchRecordConverter { * @return the json converted record */ static String convertToString(Record record) { - return convertToString(record, new String("location")); + return convertToString(record, "location"); } /**