Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flink load default configuration #5025

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,11 @@ object FlinkEnvConfiguration {
val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)

val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml")

val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true)

val FLINK_ENV_JAVA_OPTS =
CommonVars("flink.env.java.opts", "env.java.opts")

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
import org.apache.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration._
import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, FlinkEngineConnContext}
import org.apache.linkis.engineconnplugin.flink.setting.Settings
import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil}
import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, FlinkValueFormatUtil, ManagerUtil}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import org.apache.linkis.manager.engineplugin.common.creation.{
Expand All @@ -55,16 +55,18 @@ import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}

import java.io.File
import java.io.{File, FileNotFoundException}
import java.net.URL
import java.text.MessageFormat
import java.time.Duration
import java.util
import java.util.{Collections, Locale}

import scala.collection.JavaConverters._
import scala.io.Source

import com.google.common.collect.{Lists, Sets}
import org.yaml.snakeyaml.Yaml

class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging {

Expand Down Expand Up @@ -196,7 +198,15 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots)
// set extra configs
options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach {
case (key, value) => flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value)
case (key, value) =>
var flinkConfigValue = value
if (
FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key
.equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)
) {
flinkConfigValue = getExtractJavaOpts(value)
}
flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue)
}
// set kerberos config
if (FLINK_KERBEROS_ENABLE.getValue(options)) {
Expand Down Expand Up @@ -295,6 +305,44 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
context
}

private def getExtractJavaOpts(envJavaOpts: String): String = {
var defaultJavaOpts = ""
val yamlFilePath = FLINK_CONF_DIR.getValue
val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
if (new File(yamlFile).exists()) {
val source = Source.fromFile(yamlFile)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
val inputStream = getClass.getResourceAsStream(yamlFile)
if (inputStream != null) {
val source = Source.fromInputStream(inputStream)
try {
val yamlContent = source.mkString
val yaml = new Yaml()
val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]])
if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
}
} finally {
source.close()
}
} else {
throw new FileNotFoundException("YAML file not found in both file system and classpath.")
}
}
val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts)
merged
}

protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = {
val engineConnModeLabel = getEngineConnModeLabel(labels)
engineConnModeLabel != null && (EngineConnMode.toEngineConnMode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,61 @@ object FlinkValueFormatUtil {
case _ => null
}

def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = {
val patternX = """-XX:([^\s]+)=([^\s]+)""".r
val keyValueMapX = patternX
.findAllMatchIn(envJavaOpts)
.map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}
.toMap

val patternD = """-D([^\s]+)=([^\s]+)""".r
val keyValueMapD = patternD
.findAllMatchIn(envJavaOpts)
.map { matchResult =>
val key = matchResult.group(1)
val value = matchResult.group(2)
(key, value)
}
.toMap
val xloggcPattern = """-Xloggc:[^\s]+""".r
val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString
val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString
var escapedXloggcValue = ""
var replaceStr1 = ""
var replaceStr2 = ""
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) {
escapedXloggcValue = xloggcValueStr2.replace("\\<", "<").replace("\\>", ">")
replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "")
}
if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) {
escapedXloggcValue = xloggcValueStr1.replace("\\<", "<").replace("\\>", ">")
replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue)
replaceStr2 = envJavaOpts
}
if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) {
replaceStr1 = defaultJavaOpts
replaceStr2 = envJavaOpts
}
val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) =>
val (key, value) = entry
val oldValue = s"$key=[^\\s]+"
val newValue = key + "=" + value
result.replaceAll(oldValue, newValue)
}

val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) =>
val (key, value) = entry
val oldValue = s"$key=[^\\s]+"
val newValue = key + "=" + value
result.replaceAll(oldValue, newValue)
}
val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ")
javaOpts
}

}
Loading