Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/logisland 587 support for structured stream checkpointing on azure filesystem #590

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ http://www.w3.org/2001/XMLSchema-instance ">
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/license/**</exclude>
<exclude>META-INF/*</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/maven/**</exclude>
<exclude>LICENSE</exclude>
<exclude>NOTICE</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ http://www.w3.org/2001/XMLSchema-instance ">
<scala.version>2.11.8</scala.version>
<jackson.version>2.6.6</jackson.version>
<eventhubs.version>2.3.14.1</eventhubs.version>
<hadoop-azure.version>2.7.0</hadoop-azure.version>
<hadoop-common.version>2.7.0</hadoop-common.version>
</properties>


Expand Down Expand Up @@ -388,6 +390,7 @@ http://www.w3.org/2001/XMLSchema-instance ">
<version>5.1.3.RELEASE</version>
</dependency>

<!-- Allow to interact with azure eventhub -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-spark_${scala.binary.version}</artifactId>
Expand All @@ -400,6 +403,19 @@ http://www.w3.org/2001/XMLSchema-instance ">
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- hadoop-azure and hadoop-common allow to interact with azure storage (checkpointing...) -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
<version>${hadoop-azure.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop-common.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,30 @@

package com.hurence.logisland.engine.spark


import java.util
import java.util.concurrent.Executors
import java.util.regex.Pattern
import java.util.stream.Collectors
import java.util.{Collections, UUID}

import com.hurence.logisland.component.{AllowableValue, ComponentContext, InitializationException, PropertyDescriptor}
import java.util.{Collections, Map, UUID}
import com.hurence.logisland.component.{AllowableValue, PropertyDescriptor}
import com.hurence.logisland.engine.spark.KafkaStreamProcessingEngine.SPARK_CUSTOM_CONFIG_PREFIX
import com.hurence.logisland.engine.spark.remote.PipelineConfigurationBroadcastWrapper
import com.hurence.logisland.engine.{AbstractProcessingEngine, EngineContext}
import com.hurence.logisland.stream.StreamContext
import com.hurence.logisland.stream.spark.{AbstractKafkaRecordStream, SparkRecordStream, SparkStreamContext}
import com.hurence.logisland.stream.spark.{SparkRecordStream, SparkStreamContext}
import com.hurence.logisland.util.spark.ControllerServiceLookupSink
import com.hurence.logisland.validator.StandardValidators
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.groupon.metrics.UserMetricsSystem
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._


object KafkaStreamProcessingEngine {


val SPARK_PROPERTIES_FILE_PATH: PropertyDescriptor = new PropertyDescriptor.Builder()//Not used in code but in logisland.sh script. Si it must be present !
.name("spark.properties.file.path")
.description("for using --properties-file option while submitting spark job")
Expand Down Expand Up @@ -272,7 +268,6 @@ object KafkaStreamProcessingEngine {
.defaultValue("4")
.build


val SPARK_YARN_AM_ATTEMPT_FAILURES_VALIDITY_INTERVAL = new PropertyDescriptor.Builder()
.name("spark.yarn.am.attemptFailuresValidityInterval")
.description("If the application runs for days or weeks without restart " +
Expand Down Expand Up @@ -388,6 +383,7 @@ object KafkaStreamProcessingEngine {
.allowableValues(OVERWRITE_EXISTING, KEEP_OLD_FIELD)
.build();

val SPARK_CUSTOM_CONFIG_PREFIX = "spark.custom.config."
}

class KafkaStreamProcessingEngine extends AbstractProcessingEngine {
Expand Down Expand Up @@ -458,6 +454,8 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine {
setConfProperty(conf, engineContext, KafkaStreamProcessingEngine.SPARK_YARN_QUEUE)
}

handleDynamicProperties(context.getProperties)

logger.info("Configuration from logisland main")
logger.info(conf.toDebugString)

Expand All @@ -474,10 +472,8 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine {
logger.info("Application stopped")
}


PipelineConfigurationBroadcastWrapper.getInstance().refresh(engineContext, sparkContext)


SQLContext.getOrCreate(getCurrentSparkContext()).streams.addListener(new StreamingQueryListener {

val runMap = scala.collection.mutable.Map[UUID, String]()
Expand Down Expand Up @@ -519,6 +515,24 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine {
logger.info(s"conf : ${conf.toDebugString}")
}

protected def handleDynamicProperties(properties : Map[PropertyDescriptor, String]) = {

properties.foreach(propertyAndValue => {

val propertyDescriptor: PropertyDescriptor = propertyAndValue._1
val propertyName: String = propertyDescriptor.getName
// Handle any dynamic property of the form 'spark.custom.config.XXX: someValue' properties
// which will be passed to spark configuration as property 'XXX=someValue'
if (propertyDescriptor.isDynamic && propertyName.startsWith(SPARK_CUSTOM_CONFIG_PREFIX)) {
val customSparkConfigKey: String = propertyName.substring(SPARK_CUSTOM_CONFIG_PREFIX.length)
if (customSparkConfigKey.length > 0) { // Ignore silly 'spark.custom.config.: missing_custom_key' property
val customSparkConfigValue: String = propertyAndValue._2
conf.set(customSparkConfigKey, customSparkConfigValue)
}
}
})
}

override def getSupportedPropertyDescriptors: util.List[PropertyDescriptor] = {
val descriptors: util.List[PropertyDescriptor] = new util.ArrayList[PropertyDescriptor]
descriptors.add(KafkaStreamProcessingEngine.SPARK_APP_NAME)
Expand Down Expand Up @@ -557,9 +571,26 @@ class KafkaStreamProcessingEngine extends AbstractProcessingEngine {
descriptors.add(KafkaStreamProcessingEngine.SPARK_TOTAL_EXECUTOR_CORES)
descriptors.add(KafkaStreamProcessingEngine.SPARK_SUPERVISE)
descriptors.add(KafkaStreamProcessingEngine.SPARK_CONF_POLICY)

Collections.unmodifiableList(descriptors)
}

override def getSupportedDynamicPropertyDescriptor(propertyDescriptorName: String): PropertyDescriptor = {

// Support any custom spark configuration.
// The property name must start with SPARK_CUSTOM_CONFIG_PREFIX, the rest
// of the key will be the real key used to pass to spark configuration
if (propertyDescriptorName.startsWith(SPARK_CUSTOM_CONFIG_PREFIX)) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.expressionLanguageSupported(false)
.required(false)
.dynamic(true)
.build
}
return null
}


/**
* start the engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class StructuredStream extends AbstractRecordStream with SparkRecordStream {
descriptors.add(WRITE_STREAM_SERVICE_PROVIDER)
descriptors.add(GROUP_BY_FIELDS)
descriptors.add(SPARK_BASE_CHECKPOINT_PATH)
descriptors.add(SPARK_SQL_SHUFFLE_PARTITIONS)
// descriptors.add(STATE_TIMEOUT_DURATION_MS)
// descriptors.add(STATE_TIMEOUT_DURATION_MS)
// descriptors.add(STATEFULL_OUTPUT_MODE)
Expand Down Expand Up @@ -88,7 +89,8 @@ class StructuredStream extends AbstractRecordStream with SparkRecordStream {
val pipelineMetricPrefix = context.getIdentifier /*+ ".partition" + partitionId*/ + "."
val pipelineTimerContext = UserMetricsSystem.timer(pipelineMetricPrefix + "Pipeline.processing_time_ms").time()

sparkSession.sqlContext.setConf("spark.sql.shuffle.partitions", "4")//TODO make this configurable
sparkSession.sqlContext.setConf("spark.sql.shuffle.partitions",
context.getPropertyValue(SPARK_SQL_SHUFFLE_PARTITIONS).asString())

//TODO Je pense que ces deux ligne ne servent a rien
val controllerServiceLookup = sparkStreamContext.broadCastedControllerServiceLookupSink.value.getControllerServiceLookup()
Expand Down Expand Up @@ -257,11 +259,22 @@ object StructuredStream {

val SPARK_BASE_CHECKPOINT_PATH: PropertyDescriptor = new PropertyDescriptor.Builder()
.name("spark.base.checkpoint.path")
.description("Path to store all checkpoint for all sink, they will b stored in" +
" ${spark.base.checkpoint.path}/${stream_id}/${service_id}")
.description("Path to store all checkpoint for all sink, they will be stored in" +
" ${spark.base.checkpoint.path}/${stream_id}/${service_id}. If using for instance azure filesystem to" +
" checkpoint, then use KafkaStreamProcessingEngine spark.custom.config.xxx property like to set filesystem" +
" credentials. Suppose you set something like `spark.base.checkpoint.path: wasbs://<myContainer>@<myStorageAccount>.blob.core.windows.net/spark-checkpointing`" +
" in StructuredStream configuration, then you can set the matching SAS key under the root KafkaStreamProcessingEngine configuration" +
" with something like `spark.custom.config.fs.azure.account.key.<myStorageAccount>.blob.core.windows.net: +H5IuOtsebY7fO6QyyntmlRLe3G8Rv0jcye6kzE2Wz4NrU3IdB4Q8ocJY2ScY9cQrJNXxUg2WbYJPndMuQWUCQ==`")
.required(false)
.defaultValue("checkpoints")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build;

val SPARK_SQL_SHUFFLE_PARTITIONS: PropertyDescriptor = new PropertyDescriptor.Builder()
.name("spark.sql.shuffle.partitions")
.description("Regular spark.sql.shuffle.partitions. Defaults to 200")
.required(false)
.defaultValue("200")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build;
}