Skip to content

Commit

Permalink
Merge pull request #123 from yaooqinn/KYUUBI-122
Browse files Browse the repository at this point in the history
[KYUUBI-122]obtain delegation tokens from possible kerberized services
  • Loading branch information
yaooqinn authored Dec 6, 2018
2 parents 1e1e83d + 0a166e7 commit 989068b
Show file tree
Hide file tree
Showing 16 changed files with 513 additions and 22 deletions.
28 changes: 16 additions & 12 deletions kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,30 @@ import yaooqinn.kyuubi.Logging
object KyuubiSparkUtil extends Logging {
// PREFIXES
val SPARK_PREFIX = "spark."
private[this] val YARN_PREFIX = "yarn."
private[this] val HADOOP_PRFIX = "hadoop."
private val YARN_PREFIX = "yarn."
private val HADOOP_PRFIX = "hadoop."
val SPARK_HADOOP_PREFIX: String = SPARK_PREFIX + HADOOP_PRFIX
private[this] val DRIVER_PREFIX = "driver."
private[this] val AM_PREFIX = SPARK_PREFIX + YARN_PREFIX + "am."
private val SPARK_YARN_PREFIX: String = SPARK_PREFIX + YARN_PREFIX
private val DRIVER_PREFIX = "driver."
private val AM_PREFIX = SPARK_PREFIX + YARN_PREFIX + "am."

private[this] val UI_PREFIX = "ui."
private[this] val SQL_PREFIX = "sql."
private[this] val HIVE_PREFIX = "hive."
private[this] val METASTORE_PREFIX = "metastore."
private val UI_PREFIX = "ui."
private val SQL_PREFIX = "sql."
private val HIVE_PREFIX = "hive."
private val METASTORE_PREFIX = "metastore."

// ENVIRONMENTS
val SPARK_HOME: String = System.getenv("SPARK_HOME")
val SPARK_JARS_DIR: String = SPARK_HOME + File.separator + "jars"

// YARN
val KEYTAB: String = SPARK_PREFIX + YARN_PREFIX + "keytab"
val PRINCIPAL: String = SPARK_PREFIX + YARN_PREFIX + "principal"
val MAX_APP_ATTEMPTS: String = SPARK_PREFIX + YARN_PREFIX + "maxAppAttempts"
val SPARK_YARN_JARS: String = SPARK_PREFIX + YARN_PREFIX + "jars"
val KEYTAB: String = SPARK_YARN_PREFIX + "keytab"
val PRINCIPAL: String = SPARK_YARN_PREFIX + "principal"
val MAX_APP_ATTEMPTS: String = SPARK_YARN_PREFIX + "maxAppAttempts"
val SPARK_YARN_JARS: String = SPARK_YARN_PREFIX + "jars"
val ACCESS_NNS: String = SPARK_YARN_PREFIX + "access.namenodes"
val ACCESS_FSS: String = SPARK_YARN_PREFIX + "access.hadoopFileSystems"
val STAGING_DIR: String = SPARK_YARN_PREFIX + "stagingDir"

// DRIVER
val DRIVER_BIND_ADDR: String = SPARK_PREFIX + DRIVER_PREFIX + "bindAddress"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package yaooqinn.kyuubi.service
*/
class ServiceException(message: String, cause: Throwable) extends RuntimeException(message, cause) {

def this(cause: Throwable) = this("", cause)
def this(cause: Throwable) = this(cause.toString, cause)

def this(message: String) = this(message, null)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import yaooqinn.kyuubi.auth.KyuubiAuthFactory
import yaooqinn.kyuubi.cli._
import yaooqinn.kyuubi.operation.{KyuubiOperation, OperationHandle, OperationManager}
import yaooqinn.kyuubi.schema.RowSet
import yaooqinn.kyuubi.session.security.TokenCollector
import yaooqinn.kyuubi.spark.SparkSessionWithUGI
import yaooqinn.kyuubi.utils.KyuubiHadoopUtil

/**
* An Execution Session with [[SparkSession]] instance inside, which shares [[SparkContext]]
Expand Down Expand Up @@ -73,7 +75,9 @@ private[kyuubi] class KyuubiSession(
// Do not check keytab file existing as spark-submit has it done
currentUser.reloginFromKeytab()
}
UserGroupInformation.createProxyUser(username, currentUser)
val user = UserGroupInformation.createProxyUser(username, currentUser)
KyuubiHadoopUtil.doAs(user)(TokenCollector.obtainTokenIfRequired(conf))
user
} else {
UserGroupInformation.createRemoteUser(username)
}
Expand Down Expand Up @@ -276,7 +280,7 @@ private[kyuubi] class KyuubiSession(
}
}

private[this] def closeTimedOutOperations(operations: Seq[KyuubiOperation]): Unit = {
private def closeTimedOutOperations(operations: Seq[KyuubiOperation]): Unit = {
acquire(false)
try {
operations.foreach { op =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.session.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.KyuubiSparkUtil._
import org.apache.spark.SparkConf

import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.service.ServiceException

/**
* Token collector for secured HDFS FileSystems
*/
private[security] object HDFSTokenCollector extends TokenCollector with Logging {

private def hadoopFStoAccess(conf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = {
val fileSystems = conf.getOption(ACCESS_FSS)
.orElse(conf.getOption(ACCESS_NNS)) match {
case Some(nns) => nns.split(",").map(new Path(_).getFileSystem(hadoopConf)).toSet
case _ => Set.empty[FileSystem]
}

fileSystems +
conf.getOption(STAGING_DIR).map(new Path(_).getFileSystem(hadoopConf))
.getOrElse(FileSystem.get(hadoopConf))
}

private def renewer(hadoopConf: Configuration): String = {
val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
debug("Delegation token renewer is: " + tokenRenewer)

if (tokenRenewer == null || tokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer."
error(errorMessage)
throw new ServiceException(errorMessage)
}
tokenRenewer
}

override def obtainTokens(conf: SparkConf): Unit = {
val hadoopConf = newConfiguration(conf)
val tokenRenewer = renewer(hadoopConf)
val creds = new Credentials()
hadoopFStoAccess(conf, hadoopConf).foreach { fs =>
fs.addDelegationTokens(tokenRenewer, creds)
}
UserGroupInformation.getCurrentUser.addCredentials(creds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.session.security

import scala.util.control.NonFatal

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token
import org.apache.spark.SparkConf

import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.utils.KyuubiHadoopUtil
import yaooqinn.kyuubi.utils.KyuubiHiveUtil._

private[security] object HiveTokenCollector extends TokenCollector with Logging {

override def obtainTokens(conf: SparkConf): Unit = {
try {
val c = hiveConf(conf)
val principal = c.getTrimmed(METASTORE_PRINCIPAL)
val uris = c.getTrimmed(URIS)
require(StringUtils.isNotEmpty(principal), METASTORE_PRINCIPAL + " Undefined")
require(StringUtils.isNotEmpty(uris), URIS + " Undefined")
val currentUser = UserGroupInformation.getCurrentUser.getUserName
val credentials = new Credentials()
KyuubiHadoopUtil.doAsRealUser {
val hive = Hive.get(c, classOf[HiveConf])
info(s"Getting token from Hive Metastore for owner $currentUser via $principal")
val tokenString = hive.getDelegationToken(currentUser, principal)
val token = new Token[DelegationTokenIdentifier]
token.decodeFromUrlString(tokenString)
info(s"Got " + DelegationTokenIdentifier.stringifyToken(token))
credentials.addToken(new Text("hive.metastore.delegation.token"), token)
}
UserGroupInformation.getCurrentUser.addCredentials(credentials)
} catch {
case NonFatal(e) =>
error("Failed to get token from hive metatore service", e)
} finally {
Hive.closeCurrent()
}
}

override def tokensRequired(conf: SparkConf): Boolean = {
UserGroupInformation.isSecurityEnabled && StringUtils.isNotBlank(hiveConf(conf).get(URIS))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.session.security

import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SparkConf

/**
* An interface for secured service token collectors
*/
private[security] trait TokenCollector {

/**
* Obtain tokens from secured services, such as Hive Metastore Server. HDFS etc.
* @param conf a SparkConf
*/
def obtainTokens(conf: SparkConf): Unit

/**
* Check whether a service need tokens to visit
* @param conf a SparkConf
* @return true if the service to visit requires tokens
*/
def tokensRequired(conf: SparkConf): Boolean = UserGroupInformation.isSecurityEnabled

}

private[session] object TokenCollector {

/**
* Obtain tokens from all secured services if required.
* @param conf a SparkConf
*/
def obtainTokenIfRequired(conf: SparkConf): Unit = {
Seq(HiveTokenCollector, HDFSTokenCollector).foreach { co =>
if (co.tokensRequired(conf)) co.obtainTokens(conf)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package yaooqinn.kyuubi.utils

import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -61,8 +62,22 @@ private[kyuubi] object KyuubiHadoopUtil extends Logging {
}

def doAs[T](user: UserGroupInformation)(f: => T): T = {
user.doAs(new PrivilegedExceptionAction[T] {
override def run(): T = f
})
try {
user.doAs(new PrivilegedExceptionAction[T] {
override def run(): T = f
})
} catch {
case e: UndeclaredThrowableException => throw Option(e.getCause).getOrElse(e)
}
}

/**
* Run some code as the real logged in user (which may differ from the current user, for
* example, when using proxying).
*/
def doAsRealUser[T](f: => T): T = {
val currentUser = UserGroupInformation.getCurrentUser
val realUser = Option(currentUser.getRealUser).getOrElse(currentUser)
doAs(realUser)(f)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.utils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.{KyuubiSparkUtil, SparkConf}

object KyuubiHiveUtil {

private val HIVE_PREFIX = "hive."
private val METASTORE_PREFIX = "metastore."

val URIS: String = HIVE_PREFIX + METASTORE_PREFIX + "uris"
val METASTORE_PRINCIPAL: String = HIVE_PREFIX + METASTORE_PREFIX + "kerberos.principal"

def hiveConf(conf: SparkConf): Configuration = {
val hadoopConf = KyuubiSparkUtil.newConfiguration(conf)
new HiveConf(hadoopConf, classOf[HiveConf])
}

}
22 changes: 22 additions & 0 deletions kyuubi-server/src/test/scala/yaooqinn/kyuubi/SecuredFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,44 @@

package yaooqinn.kyuubi

import java.io.IOException

import org.apache.hadoop.minikdc.MiniKdc
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{KyuubiSparkUtil, SparkConf}

trait SecuredFunSuite {

var kdc: MiniKdc = null
val baseDir = KyuubiSparkUtil.createTempDir(namePrefix = "kyuubi-kdc")
try {
val kdcConf = MiniKdc.createConf()
kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer")
kdcConf.setProperty(MiniKdc.ORG_NAME, "KYUUBI")
kdcConf.setProperty(MiniKdc.ORG_DOMAIN, "COM")

if (kdc == null) {
kdc = new MiniKdc(kdcConf, baseDir)
kdc.start()
}
} catch {
case e: IOException =>
throw new AssertionError("unable to create temporary directory: " + e.getMessage)
}

def tryWithSecurityEnabled(block: => Unit): Unit = {
val conf = new SparkConf(true)
assert(!UserGroupInformation.isSecurityEnabled)
val authType = "spark.hadoop.hadoop.security.authentication"
try {
conf.set(authType, "KERBEROS")
System.setProperty("java.security.krb5.realm", kdc.getRealm)
UserGroupInformation.setConfiguration(KyuubiSparkUtil.newConfiguration(conf))
assert(UserGroupInformation.isSecurityEnabled)
block
} finally {
conf.remove(authType)
System.clearProperty("java.security.krb5.realm")
UserGroupInformation.setConfiguration(KyuubiSparkUtil.newConfiguration(conf))
assert(!UserGroupInformation.isSecurityEnabled)
}
Expand Down
Loading

0 comments on commit 989068b

Please sign in to comment.