Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNAP 3104] Showing external hive metastore tables as part of HIVETABLES VTI #1472

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package io.snappydata.gemxd

import java.io.{File, InputStream}
import java.util.{Iterator => JIterator}
import java.{lang, util}
import java.util.{List, Iterator => JIterator}

import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl
import com.gemstone.gemfire.internal.cache.{ExternalTableMetaData, GemFireCacheImpl}
import com.gemstone.gemfire.internal.shared.Version
import com.gemstone.gemfire.internal.{ByteArrayDataInput, ClassPathLoader, GemFireVersion}
import com.pivotal.gemfirexd.Attribute
Expand All @@ -38,13 +37,18 @@ import io.snappydata.cluster.ExecutorInitiator
import io.snappydata.impl.LeadImpl
import io.snappydata.recovery.RecoveryService
import io.snappydata.remote.interpreter.SnappyInterpreterExecute
import io.snappydata.sql.catalog.CatalogObjectType
import io.snappydata.util.ServiceUtils
import io.snappydata.{ServiceManager, SnappyEmbeddedTableStatsProviderService}
import org.apache.spark.Logging
import org.apache.spark.scheduler.cluster.SnappyClusterManager
import org.apache.spark.serializer.{KryoSerializerPool, StructTypeSerializer}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.collection.ToolsCallbackInit
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.columnar.ExternalStoreUtils
import org.apache.spark.sql.{SaveMode, SnappyContext}

/**
* Callbacks that are sent by GemXD to Snappy for cluster management
Expand All @@ -53,6 +57,8 @@ object ClusterCallbacksImpl extends ClusterCallbacks with Logging {

CallbackFactoryProvider.setClusterCallbacks(this)

private val PASSWORD_MATCH = "(?i)(password|passwd|secret).*".r

private[snappydata] def initialize(): Unit = {
// nothing to be done; singleton constructor does all
}
Expand Down Expand Up @@ -265,6 +271,37 @@ object ClusterCallbacksImpl extends ClusterCallbacks with Logging {
}
}

override def getHiveTablesMetadata():
util.Collection[ExternalTableMetaData] = {
val catalogTables = SnappyContext.getHiveCatalogTables()
import scala.collection.JavaConverters._
getTablesMetadata(catalogTables).asJava
}

private def getTablesMetadata(catalogTables: Seq[CatalogTable]): Seq[ExternalTableMetaData] = {
catalogTables.map(catalogTableToMetadata)
}

private def catalogTableToMetadata(table: CatalogTable) = {
val tableType = CatalogObjectType.getTableType(table)
val tblDataSourcePath = table.storage.locationUri match {
case None => ""
case Some(l) => ServiceUtils.maskLocationURI(l)
}

val metaData = new ExternalTableMetaData(table.identifier.table,
table.database, tableType.toString, null, -1,
-1, null, null, null, null,
tblDataSourcePath, "", false)
metaData.provider = table.provider match {
case None => ""
case Some(p) => p
}
metaData.shortProvider = metaData.provider
metaData.columns = ExternalStoreUtils.getColumnMetadata(table.schema)
metaData
}

override def getInterpreterExecution(sql: String, v: Version,
connId: lang.Long): InterpreterExecute = new SnappyInterpreterExecute(sql, connId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ object SnappyExternalCatalog {
val INDEXED_TABLE_LOWER: String = Utils.toLowerCase("INDEXED_TABLE")

val EMPTY_SCHEMA: StructType = StructType(Nil)
private[sql] val PASSWORD_MATCH = "(?i)(password|passwd|secret).*".r

val currentFunctionIdentifier = new ThreadLocal[FunctionIdentifier]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import io.snappydata.Constant.{SPARK_STORE_PREFIX, STORE_PROPERTY_PREFIX}
import io.snappydata.sql.catalog.SnappyExternalCatalog.checkSchemaPermission
import io.snappydata.sql.catalog.{CatalogObjectType, ConnectorExternalCatalog, SnappyExternalCatalog}
import io.snappydata.thrift._
import io.snappydata.util.ServiceUtils
import org.apache.log4j.{Level, LogManager}

import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -511,41 +512,25 @@ class StoreHiveCatalog extends ExternalCatalog with Logging {
}
}

private def maskPassword(s: String): String = {
SnappyExternalCatalog.PASSWORD_MATCH.replaceAllIn(s, "xxx")
}

// Mask access key and secret access key in case of S3 URI
private def maskLocationURI(locURI: String): String = {
val uri = toLowerCase(locURI)
val maskedSrcPath = if ((uri.startsWith("s3a://") ||
uri.startsWith("s3://") ||
uri.startsWith("s3n://")) && uri.contains("@")) {
locURI.replace(locURI.slice(locURI.indexOf("//") + 2,
locURI.indexOf("@")), "****:****")
} else maskPassword(locURI)
maskedSrcPath
}

// latest change is here - mask it here - include s3 masking here too
private def getDataSourcePath(properties: scala.collection.Map[String, String],
storage: CatalogStorageFormat): String = {
properties.get("path") match {
case Some(p) if !p.isEmpty => maskLocationURI(p)
case Some(p) if !p.isEmpty => ServiceUtils.maskLocationURI(p)
case _ => properties.get("region.path") match { // for external GemFire connector
case Some(p) if !p.isEmpty => maskLocationURI(p)
case Some(p) if !p.isEmpty => ServiceUtils.maskLocationURI(p)
case _ => properties.get("url") match { // jdbc
case Some(p) if !p.isEmpty =>
// mask the password if present
val url = maskLocationURI(p)
val url = ServiceUtils.maskLocationURI(p)
// add dbtable if present
properties.get(SnappyExternalCatalog.DBTABLE_PROPERTY) match {
case Some(d) if !d.isEmpty => s"$url; ${SnappyExternalCatalog.DBTABLE_PROPERTY}=$d"
case _ => url
}
case _ => storage.locationUri match { // fallback to locationUri
case None => ""
case Some(l) => maskLocationURI(l)
case Some(l) => ServiceUtils.maskLocationURI(l)
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/io/snappydata/util/ServiceUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Properties
import java.util.regex.Pattern

import scala.collection.JavaConverters._
import scala.util.matching.Regex

import _root_.com.gemstone.gemfire.distributed.DistributedMember
import _root_.com.gemstone.gemfire.distributed.internal.DistributionConfig
Expand All @@ -33,6 +34,7 @@ import io.snappydata.{Constant, Property, ServerManager, SnappyTableStatsProvide
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.collection.Utils
import org.apache.spark.sql.hive.HiveClientUtil
import org.apache.spark.sql.sources.JdbcExtendedUtils.toLowerCase
import org.apache.spark.sql.{SnappyContext, SparkSession, ThinClientConnectorMode}
import org.apache.spark.{SparkContext, SparkEnv}

Expand All @@ -41,6 +43,7 @@ import org.apache.spark.{SparkContext, SparkEnv}
*/
object ServiceUtils {

val PASSWORD_MATCH: Regex = "(?i)(password|passwd|secret).*".r
val LOCATOR_URL_PATTERN: Pattern = Pattern.compile("(.+:[0-9]+)|(.+\\[[0-9]+\\])")

private[snappydata] def getStoreProperties(
Expand Down Expand Up @@ -178,6 +181,24 @@ object ServiceUtils {
}
}

/**
* Masks access key and secret access key in case of S3 URI
*/
def maskLocationURI(locURI: String): String = {
val uri = toLowerCase(locURI)
val maskedSrcPath = if ((uri.startsWith("s3a://") ||
uri.startsWith("s3://") ||
uri.startsWith("s3n://")) && uri.contains("@")) {
locURI.replace(locURI.slice(locURI.indexOf("//") + 2,
locURI.indexOf("@")), "****:****")
} else maskPassword(locURI)
maskedSrcPath
}

private def maskPassword(s: String): String = {
PASSWORD_MATCH.replaceAllIn(s, "xxx")
}

/**
* We capture table ddl string and add it to table properties before adding to catalog.
* This will also contain passwords in the string such as jdbc connection string or
Expand Down
18 changes: 17 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/SnappyContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
import com.pivotal.gemfirexd.Attribute
import com.pivotal.gemfirexd.internal.engine.Misc
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils
import com.pivotal.gemfirexd.internal.engine.store.GemFireStore.StoreAdvisee
import com.pivotal.gemfirexd.internal.shared.common.SharedUtils
import io.snappydata.sql.catalog.{CatalogObjectType, ConnectorExternalCatalog}
import io.snappydata.util.ServiceUtils
Expand All @@ -49,6 +50,7 @@ import org.apache.spark.memory.MemoryManagerCallback
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerExecutorAdded}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.SortDirection
import org.apache.spark.sql.collection.{ToolsCallbackInit, Utils}
import org.apache.spark.sql.execution.columnar.ExternalStoreUtils.CaseInsensitiveMutableHashMap
Expand Down Expand Up @@ -874,6 +876,12 @@ object SnappyContext extends Logging {
}
}

def getHiveCatalogTables(skipSchemas: Seq[String] = "SYS" :: Nil): Seq[CatalogTable] = {
val catalog = hiveSession.sessionState.catalog
catalog.listDatabases().filter(s => skipSchemas.isEmpty || !skipSchemas.contains(s)).
flatMap(schema => catalog.listTables(schema).map(table => catalog.getTableMetadata(table)))
}

private[spark] def getBlockIdIfNull(
executorId: String): Option[BlockAndExecutorId] =
Option(storeToBlockMap.get(executorId))
Expand Down Expand Up @@ -1190,7 +1198,7 @@ object SnappyContext extends Logging {
def newHiveSession(): SparkSession = contextLock.synchronized {
val sc = globalSparkContext
sc.conf.set(StaticSQLConf.CATALOG_IMPLEMENTATION.key, "hive")
if (this.hiveSession ne null) this.hiveSession.newSession()
val hiveSession = if (this.hiveSession ne null) this.hiveSession.newSession()
else {
val session = SparkSession.builder().enableHiveSupport().getOrCreate()
if (session.sharedState.externalCatalog.isInstanceOf[HiveExternalCatalog] &&
Expand All @@ -1203,6 +1211,14 @@ object SnappyContext extends Logging {
this.hiveSession
}
}
updateAndDistributeProfile()
hiveSession
}

private def updateAndDistributeProfile(): Unit = {
val advisee = GemFireXDUtils.getGfxdAdvisor.getAdvisee.asInstanceOf[StoreAdvisee]
advisee.setHiveSessionInitialized(true)
GemFireXDUtils.getGfxdAdvisor.distributeProfileUpdate()
}

def hasHiveSession: Boolean = contextLock.synchronized(this.hiveSession ne null)
Expand Down