Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI-122]obtain delegation tokens from possible kerberized services #123

Merged
merged 8 commits into from
Dec 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,30 @@ import yaooqinn.kyuubi.Logging
object KyuubiSparkUtil extends Logging {
// PREFIXES
val SPARK_PREFIX = "spark."
private[this] val YARN_PREFIX = "yarn."
private[this] val HADOOP_PRFIX = "hadoop."
private val YARN_PREFIX = "yarn."
private val HADOOP_PRFIX = "hadoop."
val SPARK_HADOOP_PREFIX: String = SPARK_PREFIX + HADOOP_PRFIX
private[this] val DRIVER_PREFIX = "driver."
private[this] val AM_PREFIX = SPARK_PREFIX + YARN_PREFIX + "am."
private val SPARK_YARN_PREFIX: String = SPARK_PREFIX + YARN_PREFIX
private val DRIVER_PREFIX = "driver."
private val AM_PREFIX = SPARK_PREFIX + YARN_PREFIX + "am."

private[this] val UI_PREFIX = "ui."
private[this] val SQL_PREFIX = "sql."
private[this] val HIVE_PREFIX = "hive."
private[this] val METASTORE_PREFIX = "metastore."
private val UI_PREFIX = "ui."
private val SQL_PREFIX = "sql."
private val HIVE_PREFIX = "hive."
private val METASTORE_PREFIX = "metastore."

// ENVIRONMENTS
val SPARK_HOME: String = System.getenv("SPARK_HOME")
val SPARK_JARS_DIR: String = SPARK_HOME + File.separator + "jars"

// YARN
val KEYTAB: String = SPARK_PREFIX + YARN_PREFIX + "keytab"
val PRINCIPAL: String = SPARK_PREFIX + YARN_PREFIX + "principal"
val MAX_APP_ATTEMPTS: String = SPARK_PREFIX + YARN_PREFIX + "maxAppAttempts"
val SPARK_YARN_JARS: String = SPARK_PREFIX + YARN_PREFIX + "jars"
val KEYTAB: String = SPARK_YARN_PREFIX + "keytab"
val PRINCIPAL: String = SPARK_YARN_PREFIX + "principal"
val MAX_APP_ATTEMPTS: String = SPARK_YARN_PREFIX + "maxAppAttempts"
val SPARK_YARN_JARS: String = SPARK_YARN_PREFIX + "jars"
val ACCESS_NNS: String = SPARK_YARN_PREFIX + "access.namenodes"
val ACCESS_FSS: String = SPARK_YARN_PREFIX + "access.hadoopFileSystems"
val STAGING_DIR: String = SPARK_YARN_PREFIX + "stagingDir"

// DRIVER
val DRIVER_BIND_ADDR: String = SPARK_PREFIX + DRIVER_PREFIX + "bindAddress"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package yaooqinn.kyuubi.service
*/
class ServiceException(message: String, cause: Throwable) extends RuntimeException(message, cause) {

def this(cause: Throwable) = this("", cause)
def this(cause: Throwable) = this(cause.toString, cause)

def this(message: String) = this(message, null)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import yaooqinn.kyuubi.auth.KyuubiAuthFactory
import yaooqinn.kyuubi.cli._
import yaooqinn.kyuubi.operation.{KyuubiOperation, OperationHandle, OperationManager}
import yaooqinn.kyuubi.schema.RowSet
import yaooqinn.kyuubi.session.security.TokenCollector
import yaooqinn.kyuubi.spark.SparkSessionWithUGI
import yaooqinn.kyuubi.utils.KyuubiHadoopUtil

/**
* An Execution Session with [[SparkSession]] instance inside, which shares [[SparkContext]]
Expand Down Expand Up @@ -73,7 +75,9 @@ private[kyuubi] class KyuubiSession(
// Do not check keytab file existing as spark-submit has it done
currentUser.reloginFromKeytab()
}
UserGroupInformation.createProxyUser(username, currentUser)
val user = UserGroupInformation.createProxyUser(username, currentUser)
KyuubiHadoopUtil.doAs(user)(TokenCollector.obtainTokenIfRequired(conf))
user
} else {
UserGroupInformation.createRemoteUser(username)
}
Expand Down Expand Up @@ -276,7 +280,7 @@ private[kyuubi] class KyuubiSession(
}
}

private[this] def closeTimedOutOperations(operations: Seq[KyuubiOperation]): Unit = {
private def closeTimedOutOperations(operations: Seq[KyuubiOperation]): Unit = {
acquire(false)
try {
operations.foreach { op =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.session.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.KyuubiSparkUtil._
import org.apache.spark.SparkConf

import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.service.ServiceException

/**
* Token collector for secured HDFS FileSystems
*/
private[security] object HDFSTokenCollector extends TokenCollector with Logging {

private def hadoopFStoAccess(conf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = {
val fileSystems = conf.getOption(ACCESS_FSS)
.orElse(conf.getOption(ACCESS_NNS)) match {
case Some(nns) => nns.split(",").map(new Path(_).getFileSystem(hadoopConf)).toSet
case _ => Set.empty[FileSystem]
}

fileSystems +
conf.getOption(STAGING_DIR).map(new Path(_).getFileSystem(hadoopConf))
.getOrElse(FileSystem.get(hadoopConf))
}

private def renewer(hadoopConf: Configuration): String = {
val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
debug("Delegation token renewer is: " + tokenRenewer)

if (tokenRenewer == null || tokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer."
error(errorMessage)
throw new ServiceException(errorMessage)
}
tokenRenewer
}

override def obtainTokens(conf: SparkConf): Unit = {
val hadoopConf = newConfiguration(conf)
val tokenRenewer = renewer(hadoopConf)
val creds = new Credentials()
hadoopFStoAccess(conf, hadoopConf).foreach { fs =>
fs.addDelegationTokens(tokenRenewer, creds)
}
UserGroupInformation.getCurrentUser.addCredentials(creds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.session.security

import scala.util.control.NonFatal

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token
import org.apache.spark.SparkConf

import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.utils.KyuubiHadoopUtil
import yaooqinn.kyuubi.utils.KyuubiHiveUtil._

private[security] object HiveTokenCollector extends TokenCollector with Logging {

override def obtainTokens(conf: SparkConf): Unit = {
try {
val c = hiveConf(conf)
val principal = c.getTrimmed(METASTORE_PRINCIPAL)
val uris = c.getTrimmed(URIS)
require(StringUtils.isNotEmpty(principal), METASTORE_PRINCIPAL + " Undefined")
require(StringUtils.isNotEmpty(uris), URIS + " Undefined")
val currentUser = UserGroupInformation.getCurrentUser.getUserName
val credentials = new Credentials()
KyuubiHadoopUtil.doAsRealUser {
val hive = Hive.get(c, classOf[HiveConf])
info(s"Getting token from Hive Metastore for owner $currentUser via $principal")
val tokenString = hive.getDelegationToken(currentUser, principal)
val token = new Token[DelegationTokenIdentifier]
token.decodeFromUrlString(tokenString)
info(s"Got " + DelegationTokenIdentifier.stringifyToken(token))
credentials.addToken(new Text("hive.metastore.delegation.token"), token)
}
UserGroupInformation.getCurrentUser.addCredentials(credentials)
} catch {
case NonFatal(e) =>
error("Failed to get token from hive metatore service", e)
} finally {
Hive.closeCurrent()
}
}

override def tokensRequired(conf: SparkConf): Boolean = {
UserGroupInformation.isSecurityEnabled && StringUtils.isNotBlank(hiveConf(conf).get(URIS))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.session.security

import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SparkConf

/**
* An interface for secured service token collectors
*/
private[security] trait TokenCollector {

/**
* Obtain tokens from secured services, such as Hive Metastore Server. HDFS etc.
* @param conf a SparkConf
*/
def obtainTokens(conf: SparkConf): Unit

/**
* Check whether a service need tokens to visit
* @param conf a SparkConf
* @return true if the service to visit requires tokens
*/
def tokensRequired(conf: SparkConf): Boolean = UserGroupInformation.isSecurityEnabled

}

private[session] object TokenCollector {

/**
* Obtain tokens from all secured services if required.
* @param conf a SparkConf
*/
def obtainTokenIfRequired(conf: SparkConf): Unit = {
Seq(HiveTokenCollector, HDFSTokenCollector).foreach { co =>
if (co.tokensRequired(conf)) co.obtainTokens(conf)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package yaooqinn.kyuubi.utils

import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -61,8 +62,22 @@ private[kyuubi] object KyuubiHadoopUtil extends Logging {
}

def doAs[T](user: UserGroupInformation)(f: => T): T = {
user.doAs(new PrivilegedExceptionAction[T] {
override def run(): T = f
})
try {
user.doAs(new PrivilegedExceptionAction[T] {
override def run(): T = f
})
} catch {
case e: UndeclaredThrowableException => throw Option(e.getCause).getOrElse(e)
}
}

/**
* Run some code as the real logged in user (which may differ from the current user, for
* example, when using proxying).
*/
def doAsRealUser[T](f: => T): T = {
val currentUser = UserGroupInformation.getCurrentUser
val realUser = Option(currentUser.getRealUser).getOrElse(currentUser)
doAs(realUser)(f)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.utils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.{KyuubiSparkUtil, SparkConf}

object KyuubiHiveUtil {

private val HIVE_PREFIX = "hive."
private val METASTORE_PREFIX = "metastore."

val URIS: String = HIVE_PREFIX + METASTORE_PREFIX + "uris"
val METASTORE_PRINCIPAL: String = HIVE_PREFIX + METASTORE_PREFIX + "kerberos.principal"

def hiveConf(conf: SparkConf): Configuration = {
val hadoopConf = KyuubiSparkUtil.newConfiguration(conf)
new HiveConf(hadoopConf, classOf[HiveConf])
}

}
22 changes: 22 additions & 0 deletions kyuubi-server/src/test/scala/yaooqinn/kyuubi/SecuredFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,44 @@

package yaooqinn.kyuubi

import java.io.IOException

import org.apache.hadoop.minikdc.MiniKdc
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{KyuubiSparkUtil, SparkConf}

trait SecuredFunSuite {

var kdc: MiniKdc = null
val baseDir = KyuubiSparkUtil.createTempDir(namePrefix = "kyuubi-kdc")
try {
val kdcConf = MiniKdc.createConf()
kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer")
kdcConf.setProperty(MiniKdc.ORG_NAME, "KYUUBI")
kdcConf.setProperty(MiniKdc.ORG_DOMAIN, "COM")

if (kdc == null) {
kdc = new MiniKdc(kdcConf, baseDir)
kdc.start()
}
} catch {
case e: IOException =>
throw new AssertionError("unable to create temporary directory: " + e.getMessage)
}

def tryWithSecurityEnabled(block: => Unit): Unit = {
val conf = new SparkConf(true)
assert(!UserGroupInformation.isSecurityEnabled)
val authType = "spark.hadoop.hadoop.security.authentication"
try {
conf.set(authType, "KERBEROS")
System.setProperty("java.security.krb5.realm", kdc.getRealm)
UserGroupInformation.setConfiguration(KyuubiSparkUtil.newConfiguration(conf))
assert(UserGroupInformation.isSecurityEnabled)
block
} finally {
conf.remove(authType)
System.clearProperty("java.security.krb5.realm")
UserGroupInformation.setConfiguration(KyuubiSparkUtil.newConfiguration(conf))
assert(!UserGroupInformation.isSecurityEnabled)
}
Expand Down
Loading