diff --git a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java index 29d1692b1..747d9dd65 100644 --- a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java +++ b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch-api/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchClientService.java @@ -204,7 +204,12 @@ public ValidationResult validate(final String subject, final String input) { .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); - + PropertyDescriptor GEOLOCATION_FIELD_LABEL = new PropertyDescriptor.Builder() + .name("geolocation.output.field.name") + .description("Label used to name the output record field for geolocation properties") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); /** * Put a given document in elasticsearch bulk processor. diff --git a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java index 46133b998..c2a7ba43f 100644 --- a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java +++ b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/ElasticsearchRecordConverter.java @@ -41,6 +41,18 @@ class ElasticsearchRecordConverter { * @return the json converted record */ static String convertToString(Record record) { + return convertToString(record, "location"); + } + + /** + * Converts an Event into an Elasticsearch document + * to be indexed later + *e + * @param record to convert + * @param geolocationFieldLabel is the label for the geolocation field + * @return the json converted record + */ + static String convertToString(Record record, String geolocationFieldLabel) { logger.trace(record.toString()); try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); @@ -117,7 +129,7 @@ static String convertToString(Record record) { if((geolocation[0] != 0) && (geolocation[1] != 0)) { - document.latlon("geolocation", geolocation[0], geolocation[1]); + document.latlon(geolocationFieldLabel, geolocation[0], geolocation[1]); } diff --git a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/Elasticsearch_7_x_ClientService.java b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/Elasticsearch_7_x_ClientService.java index 298e07387..ceb7a11ec 100644 --- a/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/Elasticsearch_7_x_ClientService.java +++ b/logisland-components/logisland-services/logisland-service-elasticsearch/logisland-service-elasticsearch_7_x-client/src/main/java/com/hurence/logisland/service/elasticsearch/Elasticsearch_7_x_ClientService.java @@ -108,6 +108,7 @@ public class Elasticsearch_7_x_ClientService extends AbstractControllerService i private volatile String authToken; protected volatile transient BulkProcessor bulkProcessor; protected volatile Map errors = new HashMap<>(); + private String geolocationFieldLabel; @Override public List getSupportedPropertyDescriptors() { @@ -129,6 +130,7 @@ public List getSupportedPropertyDescriptors() { props.add(HOSTS); props.add(PROP_SSL_CONTEXT_SERVICE); props.add(CHARSET); + props.add(GEOLOCATION_FIELD_LABEL); return Collections.unmodifiableList(props); } @@ -162,6 +164,7 @@ protected void createElasticsearchClient(ControllerServiceInitializationContext final String password = context.getPropertyValue(PASSWORD).asString(); final String hosts = context.getPropertyValue(HOSTS).asString(); final boolean enableSsl = context.getPropertyValue(ENABLE_SSL).asBoolean(); + geolocationFieldLabel = context.getPropertyValue(GEOLOCATION_FIELD_LABEL).asString(); esHosts = getEsHosts(hosts, enableSsl); @@ -799,7 +802,7 @@ public void bulkPut(String indexName, Record record) throws DatastoreClientServi } - bulkPut(indexName, null, ElasticsearchRecordConverter.convertToString(record), Optional.of(record.getId())); + bulkPut(indexName, null, convertRecordToString(record), Optional.of(record.getId())); } @Override @@ -829,6 +832,9 @@ public long queryCount(String query) { @Override public String convertRecordToString(Record record) { + if (geolocationFieldLabel != null) { + return ElasticsearchRecordConverter.convertToString(record, geolocationFieldLabel); + } return ElasticsearchRecordConverter.convertToString(record); } diff --git a/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala b/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala index bac9b10d3..b8c5d1450 100644 --- a/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala +++ b/logisland-core/logisland-engines/logisland-engine-spark_2_X/logisland-engine-spark_2_common/src/main/scala/com/hurence/logisland/engine/spark/KafkaStreamProcessingEngine.scala @@ -84,7 +84,7 @@ object KafkaStreamProcessingEngine { "local(\\[([0-9]+|\\*)(,[0-9]+)?\\])?|" + "spark:\\/\\/[a-z0-9\\.\\-]+(:[0-9]+)?(,[a-z0-9\\.\\-]+(:[0-9]+)?)*|" + "mesos:\\/\\/((zk:\\/\\/[a-z0-9\\.\\-]+:[0-9]+(,[a-z0-9\\.\\-]+:[0-9]+)*\\/mesos)|(([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+|[a-z][a-z0-9\\.\\-]+)(:[0-9]+)?))|" + - "k8s://.+)$"))) + "k8s:\\/\\/.+)$"))) .defaultValue("local[2]") .build @@ -375,6 +375,19 @@ object KafkaStreamProcessingEngine { .required(false) .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build + + val OVERWRITE_EXISTING = new AllowableValue("overwrite_existing", "overwrite existing field", "if field already exist") + + val KEEP_OLD_FIELD = new AllowableValue("keep_old_field", "keep only old field value", "keep only old field") + + val SPARK_CONF_POLICY = new PropertyDescriptor.Builder() + .name("spark.conf.properties.policy") + .description("What to do when a field with the same name already exists ?") + .required(false) + .defaultValue(OVERWRITE_EXISTING.getValue()) + .allowableValues(OVERWRITE_EXISTING, KEEP_OLD_FIELD) + .build(); + } class KafkaStreamProcessingEngine extends AbstractProcessingEngine { @@ -392,24 +405,27 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine { override def init(context: EngineContext): Unit = { super.init(context) val engineContext = context.asInstanceOf[EngineContext] - val sparkMaster = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_MASTER).asString - val appName = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_APP_NAME).asString batchDurationMs = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_STREAMING_BATCH_DURATION).asInteger().intValue() sparkStreamingTimeout = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_STREAMING_TIMEOUT).asInteger().toInt - /** - * job configuration - */ - conf.setAppName(appName) - conf.setMaster(sparkMaster) + val conflictPolicy = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_CONF_POLICY).asString def setConfProperty(conf: SparkConf, engineContext: EngineContext, propertyDescriptor: PropertyDescriptor) = { - // Need to check if the properties are set because those properties are not "requires" if (engineContext.getPropertyValue(propertyDescriptor).isSet) { - conf.set(propertyDescriptor.getName, engineContext.getPropertyValue(propertyDescriptor).asString) + if (conf.contains(propertyDescriptor.getName)) { + logger.warn("Property " + propertyDescriptor.getName + " already present in the sparkConfiguration with value " + conf.get(propertyDescriptor.getName)) + } + + if (conflictPolicy.equals(KafkaStreamProcessingEngine.OVERWRITE_EXISTING.getValue)) { + conf.set(propertyDescriptor.getName, engineContext.getPropertyValue(propertyDescriptor).asString) + logger.info("Property " + propertyDescriptor.getName + " set in the sparkConfiguration with value " + conf.get(propertyDescriptor.getName)) + } } } + setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_APP_NAME) + setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_MASTER) + setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_STREAMING_UI_RETAINED_BATCHES) setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_STREAMING_RECEIVER_WAL_ENABLE) setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_STREAMING_KAFKA_MAXRETRIES) @@ -436,19 +452,19 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine { conf.set("spark.kryo.registrator", "com.hurence.logisland.util.spark.ProtoBufRegistrator") - if (sparkMaster startsWith "yarn") { + if (conf.get(KafkaStreamProcessingEngine.SPARK_MASTER.getName) startsWith "yarn") { // Note that SPARK_YARN_DEPLOYMODE is not used by spark itself but only by spark-submit CLI // That's why we do not need to propagate it here setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_YARN_QUEUE) } + logger.info("Configuration from logisland main") + logger.info(conf.toDebugString) + val sparkContext = getCurrentSparkContext() UserMetricsSystem.initialize(sparkContext, "LogislandMetrics") - - - /** * shutdown context gracefully */ @@ -495,6 +511,8 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine { } }) + val sparkMaster = conf.get(KafkaStreamProcessingEngine.SPARK_MASTER.getName) + val appName = conf.get(KafkaStreamProcessingEngine.SPARK_APP_NAME.getName) logger.info(s"spark context initialized with master:$sparkMaster, " + s"appName:$appName, " + s"batchDuration:$batchDurationMs ") @@ -538,6 +556,7 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine { descriptors.add(KafkaStreamProcessingEngine.SPARK_MESOS_CORE_MAX) descriptors.add(KafkaStreamProcessingEngine.SPARK_TOTAL_EXECUTOR_CORES) descriptors.add(KafkaStreamProcessingEngine.SPARK_SUPERVISE) + descriptors.add(KafkaStreamProcessingEngine.SPARK_CONF_POLICY) Collections.unmodifiableList(descriptors) } diff --git a/logisland-core/logisland-framework/logisland-utils/pom.xml b/logisland-core/logisland-framework/logisland-utils/pom.xml index 65e812d4b..d738c9a23 100644 --- a/logisland-core/logisland-framework/logisland-utils/pom.xml +++ b/logisland-core/logisland-framework/logisland-utils/pom.xml @@ -156,6 +156,7 @@ org.apache.avro:avro + io.confluent @@ -183,16 +184,6 @@ - - diff --git a/logisland-docker/spark8s/Dockerfile b/logisland-docker/spark8s/Dockerfile index 042c6c38a..1793cbc94 100644 --- a/logisland-docker/spark8s/Dockerfile +++ b/logisland-docker/spark8s/Dockerfile @@ -1,4 +1,4 @@ -FROM gcr.io/spark-operator/spark:v2.4.0 +FROM gcr.io/spark-operator/spark:v2.4.5 LABEL maintainer="support@hurence.com"