Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Saas/standard events feed #574

Merged
merged 4 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ public ValidationResult validate(final String subject, final String input) {
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();


PropertyDescriptor GEOLOCATION_FIELD_LABEL = new PropertyDescriptor.Builder()
.name("geolocation.output.field.name")
.description("Label used to name the output record field for geolocation properties")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

/**
* Put a given document in elasticsearch bulk processor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ class ElasticsearchRecordConverter {
* @return the json converted record
*/
static String convertToString(Record record) {
return convertToString(record, new String("location"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I detect that this code is problematic. According to the Performance (PERFORMANCE), Dm: Method invokes inefficient new String(String) constructor (DM_STRING_CTOR).
Using the java.lang.String(String) constructor wastes memory because the object so constructed will be functionally indistinguishable from the String passed as a parameter.  Just use the argument String directly.

}

/**
* Converts an Event into an Elasticsearch document
* to be indexed later
*e
* @param record to convert
* @param geolocationFieldLabel is the label for the geolocation field
* @return the json converted record
*/
static String convertToString(Record record, String geolocationFieldLabel) {
logger.trace(record.toString());
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
Expand Down Expand Up @@ -117,7 +129,7 @@ static String convertToString(Record record) {


if((geolocation[0] != 0) && (geolocation[1] != 0)) {
document.latlon("geolocation", geolocation[0], geolocation[1]);
document.latlon(geolocationFieldLabel, geolocation[0], geolocation[1]);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class Elasticsearch_7_x_ClientService extends AbstractControllerService i
private volatile String authToken;
protected volatile transient BulkProcessor bulkProcessor;
protected volatile Map<String/*id*/, String/*errors*/> errors = new HashMap<>();
private String geolocationFieldLabel;

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
Expand All @@ -129,6 +130,7 @@ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
props.add(HOSTS);
props.add(PROP_SSL_CONTEXT_SERVICE);
props.add(CHARSET);
props.add(GEOLOCATION_FIELD_LABEL);

return Collections.unmodifiableList(props);
}
Expand Down Expand Up @@ -162,6 +164,7 @@ protected void createElasticsearchClient(ControllerServiceInitializationContext
final String password = context.getPropertyValue(PASSWORD).asString();
final String hosts = context.getPropertyValue(HOSTS).asString();
final boolean enableSsl = context.getPropertyValue(ENABLE_SSL).asBoolean();
geolocationFieldLabel = context.getPropertyValue(GEOLOCATION_FIELD_LABEL).asString();

esHosts = getEsHosts(hosts, enableSsl);

Expand Down Expand Up @@ -799,7 +802,7 @@ public void bulkPut(String indexName, Record record) throws DatastoreClientServi

}

bulkPut(indexName, null, ElasticsearchRecordConverter.convertToString(record), Optional.of(record.getId()));
bulkPut(indexName, null, convertRecordToString(record), Optional.of(record.getId()));
}

@Override
Expand Down Expand Up @@ -829,6 +832,9 @@ public long queryCount(String query) {

@Override
public String convertRecordToString(Record record) {
if (geolocationFieldLabel != null) {
return ElasticsearchRecordConverter.convertToString(record, geolocationFieldLabel);
}
return ElasticsearchRecordConverter.convertToString(record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object KafkaStreamProcessingEngine {
"local(\\[([0-9]+|\\*)(,[0-9]+)?\\])?|" +
"spark:\\/\\/[a-z0-9\\.\\-]+(:[0-9]+)?(,[a-z0-9\\.\\-]+(:[0-9]+)?)*|" +
"mesos:\\/\\/((zk:\\/\\/[a-z0-9\\.\\-]+:[0-9]+(,[a-z0-9\\.\\-]+:[0-9]+)*\\/mesos)|(([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+|[a-z][a-z0-9\\.\\-]+)(:[0-9]+)?))|" +
"k8s://.+)$")))
"k8s:\\/\\/.+)$")))
.defaultValue("local[2]")
.build

Expand Down Expand Up @@ -375,6 +375,19 @@ object KafkaStreamProcessingEngine {
.required(false)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build

val OVERWRITE_EXISTING = new AllowableValue("overwrite_existing", "overwrite existing field", "if field already exist")

val KEEP_OLD_FIELD = new AllowableValue("keep_old_field", "keep only old field value", "keep only old field")

val SPARK_CONF_POLICY = new PropertyDescriptor.Builder()
.name("spark.conf.properties.policy")
.description("What to do when a field with the same name already exists ?")
.required(false)
.defaultValue(OVERWRITE_EXISTING.getValue())
.allowableValues(OVERWRITE_EXISTING, KEEP_OLD_FIELD)
.build();

}

class KafkaStreamProcessingEngine extends AbstractProcessingEngine {
Expand All @@ -392,24 +405,27 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine {
override def init(context: EngineContext): Unit = {
super.init(context)
val engineContext = context.asInstanceOf[EngineContext]
val sparkMaster = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_MASTER).asString
val appName = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_APP_NAME).asString
batchDurationMs = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_STREAMING_BATCH_DURATION).asInteger().intValue()
sparkStreamingTimeout = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_STREAMING_TIMEOUT).asInteger().toInt
/**
* job configuration
*/
conf.setAppName(appName)
conf.setMaster(sparkMaster)
val conflictPolicy = engineContext.getPropertyValue(KafkaStreamProcessingEngine.SPARK_CONF_POLICY).asString

def setConfProperty(conf: SparkConf, engineContext: EngineContext, propertyDescriptor: PropertyDescriptor) = {

// Need to check if the properties are set because those properties are not "requires"
if (engineContext.getPropertyValue(propertyDescriptor).isSet) {
conf.set(propertyDescriptor.getName, engineContext.getPropertyValue(propertyDescriptor).asString)
if (conf.contains(propertyDescriptor.getName)) {
logger.warn("Property " + propertyDescriptor.getName + " already present in the sparkConfiguration with value " + conf.get(propertyDescriptor.getName))
}

if (conflictPolicy.equals(KafkaStreamProcessingEngine.OVERWRITE_EXISTING.getValue)) {
conf.set(propertyDescriptor.getName, engineContext.getPropertyValue(propertyDescriptor).asString)
logger.info("Property " + propertyDescriptor.getName + " set in the sparkConfiguration with value " + conf.get(propertyDescriptor.getName))
}
}
}

setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_APP_NAME)
setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_MASTER)

setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_STREAMING_UI_RETAINED_BATCHES)
setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_STREAMING_RECEIVER_WAL_ENABLE)
setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_STREAMING_KAFKA_MAXRETRIES)
Expand All @@ -436,19 +452,19 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine {

conf.set("spark.kryo.registrator", "com.hurence.logisland.util.spark.ProtoBufRegistrator")

if (sparkMaster startsWith "yarn") {
if (conf.get(KafkaStreamProcessingEngine.SPARK_MASTER.getName) startsWith "yarn") {
// Note that SPARK_YARN_DEPLOYMODE is not used by spark itself but only by spark-submit CLI
// That's why we do not need to propagate it here
setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_YARN_QUEUE)
}

logger.info("Configuration from logisland main")
logger.info(conf.toDebugString)

val sparkContext = getCurrentSparkContext()

UserMetricsSystem.initialize(sparkContext, "LogislandMetrics")




/**
* shutdown context gracefully
*/
Expand Down Expand Up @@ -495,6 +511,8 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine {
}
})

val sparkMaster = conf.get(KafkaStreamProcessingEngine.SPARK_MASTER.getName)
val appName = conf.get(KafkaStreamProcessingEngine.SPARK_APP_NAME.getName)
logger.info(s"spark context initialized with master:$sparkMaster, " +
s"appName:$appName, " +
s"batchDuration:$batchDurationMs ")
Expand Down Expand Up @@ -538,6 +556,7 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine {
descriptors.add(KafkaStreamProcessingEngine.SPARK_MESOS_CORE_MAX)
descriptors.add(KafkaStreamProcessingEngine.SPARK_TOTAL_EXECUTOR_CORES)
descriptors.add(KafkaStreamProcessingEngine.SPARK_SUPERVISE)
descriptors.add(KafkaStreamProcessingEngine.SPARK_CONF_POLICY)
Collections.unmodifiableList(descriptors)
}

Expand Down
11 changes: 1 addition & 10 deletions logisland-core/logisland-framework/logisland-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
<artifactSet>
<includes>
<include>org.apache.avro:avro</include>
<include>io.confluent</include>
</includes>
</artifactSet>
<filters>
Expand Down Expand Up @@ -183,16 +184,6 @@
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<!--
<relocations>
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>logisland.repackaged.org.apache.avro</shadedPattern>
</relocation>

</relocations>
-->

</configuration>
</execution>
</executions>
Expand Down
2 changes: 1 addition & 1 deletion logisland-docker/spark8s/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM gcr.io/spark-operator/spark:v2.4.0
FROM gcr.io/spark-operator/spark:v2.4.5

LABEL maintainer="support@hurence.com"

Expand Down