-
Notifications
You must be signed in to change notification settings - Fork 327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add CLI Support for Catalyst #337
Open
chenghao-intel
wants to merge
21
commits into
amplab:sparkSql
Choose a base branch
from
chenghao-intel:sparkSqlBack
base: sparkSql
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
2c06e38
[WIP]initial for sharkclidriver compabiled cli implementation
chenghao-intel 5a3d9f8
update the jar dependencies
chenghao-intel 0c2d7f6
Fix ClassCastException
chenghao-intel 0477652
fix bug of cli prompt when switch to hive
chenghao-intel 0afbc0f
update readme
chenghao-intel ef29e99
Fix bug of getting schema info
chenghao-intel 6c1d9f5
Add bug info in the README
chenghao-intel 3d344d0
remove the mistaken commit
chenghao-intel 93b027f
enable the cli testing
chenghao-intel d752ed5
Remove the misktaken commit
chenghao-intel 6e7b4d2
Add some document
chenghao-intel 3050f80
Add CacheRdd reload support
chenghao-intel 3e652fe
Update ReadMe for supporting the cached reload
chenghao-intel ca6255f
Output Error Message for HQL
chenghao-intel b5c031b
solve the netty / servlet-api jar conflict
chenghao-intel da57ff6
Jar conflict & Work around for CliSessionState modified by HiveContext
chenghao-intel b6792db
remove the cached table reload for next PR
chenghao-intel bf326ff
Minimize the changes for SharkBuild.scala
chenghao-intel a3732b9
Put the local maven as the last resolver
chenghao-intel 02652cf
remove the unused class
chenghao-intel 3470679
Make the unittest work
chenghao-intel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql | ||
package hive | ||
|
||
import java.util.{ArrayList => JArrayList} | ||
import scala.collection.JavaConversions._ | ||
|
||
import org.apache.hive.service.cli.TableSchema | ||
import org.apache.hadoop.hive.metastore.api.FieldSchema | ||
import org.apache.hadoop.hive.cli.CliSessionState | ||
import org.apache.hadoop.hive.cli.CliDriver | ||
import org.apache.hadoop.hive.conf.HiveConf | ||
import org.apache.hadoop.hive.ql.session.SessionState | ||
import org.apache.hadoop.hive.ql.processors.CommandProcessor | ||
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory | ||
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse | ||
import org.apache.hadoop.hive.ql.Driver | ||
|
||
import org.apache.spark.SparkContext | ||
import org.apache.spark.sql.catalyst.plans.logical.NativeCommand | ||
import org.apache.spark.sql.catalyst.plans.logical.ExplainCommand | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.execution.QueryExecutionException | ||
|
||
import shark.LogHelper | ||
|
||
//TODO work around for HiveContext, need to update that in Spark project (sql/hive), not here. | ||
case class CatalystContext(sc: SparkContext) extends HiveContext(sc) with LogHelper { | ||
@transient protected[hive] override lazy val hiveconf = sessionState.getConf() | ||
@transient protected[hive] override lazy val sessionState = SessionState.get() | ||
|
||
class HiveQLQueryExecution(hql: String) extends QueryExecution { | ||
override def logical: LogicalPlan = HiveQl.parseSql(hql) | ||
override def toString = hql + "\n" + super.toString | ||
|
||
/** | ||
* Query Result (errcode, result, exception if any) | ||
* If error code equals 0 means got the result, otherwise failed due to some reason / exception | ||
*/ | ||
def result(): (Int, Seq[String], Throwable) = analyzed match { | ||
case NativeCommand(cmd) => runOnHive(cmd) | ||
case ExplainCommand(plan) => (0, executePlan(plan).toString.split("\n"), null) | ||
case query => | ||
try{ | ||
val result: Seq[Seq[Any]] = toRdd.collect().toSeq | ||
// We need the types so we can output struct field names | ||
val types = analyzed.output.map(_.dataType) | ||
// Reformat to match hive tab delimited output. | ||
(0, result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")).toSeq, null) | ||
} catch { | ||
case e: Throwable => { | ||
logError("Error:\n $cmd\n", e) | ||
(-1, Seq[String](), e) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Get the result set table schema | ||
*/ | ||
def getResultSetSchema: TableSchema = { | ||
logger.warn(s"Result Schema: ${analyzed.output}") | ||
if (analyzed.output.size == 0) { | ||
new TableSchema(new FieldSchema("Result", "string", "") :: Nil) | ||
} else { | ||
val schema = analyzed.output.map { attr => | ||
new FieldSchema(attr.name, | ||
org.apache.spark.sql.hive.HiveMetastoreTypes.toMetastoreType(attr.dataType), "") | ||
} | ||
new TableSchema(schema) | ||
} | ||
} | ||
} | ||
|
||
def runOnHive(cmd: String, maxRows: Int = 1000): (Int, Seq[String], Throwable) = { | ||
try { | ||
val cmd_trimmed: String = cmd.trim() | ||
val tokens: Array[String] = cmd_trimmed.split("\\s+") | ||
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() | ||
val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hiveconf) | ||
|
||
proc match { | ||
case driver: Driver => | ||
driver.init() | ||
|
||
val results = new JArrayList[String] | ||
val response: CommandProcessorResponse = driver.run(cmd) | ||
// Throw an exception if there is an error in query processing. | ||
if (response.getResponseCode != 0) { | ||
driver.destroy() | ||
(response.getResponseCode, Seq[String](response.getErrorMessage()), new Exception(cmd)) | ||
} else { | ||
driver.setMaxRows(maxRows) | ||
driver.getResults(results) | ||
driver.destroy() | ||
(0, results, null) | ||
} | ||
case _ => | ||
SessionState.get().out.println(tokens(0) + " " + cmd_1) | ||
val res = proc.run(cmd_1) | ||
if(res.getResponseCode == 0) { | ||
(0, Seq[String](), null) | ||
} else { | ||
(res.getResponseCode, Seq[String](res.getErrorMessage()), new Exception(cmd_1)) | ||
} | ||
} | ||
} catch { | ||
case e: Throwable => | ||
logger.error( | ||
s""" | ||
|====================== | ||
|HIVE FAILURE OUTPUT | ||
|====================== | ||
|${outputBuffer.toString} | ||
|====================== | ||
|END HIVE FAILURE OUTPUT | ||
|====================== | ||
""".stripMargin) | ||
(-2, Seq[String](), e) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Copyright (C) 2012 The Regents of The University California. | ||
* All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package shark | ||
|
||
import java.util.ArrayList | ||
|
||
import scala.collection.JavaConversions._ | ||
|
||
import org.apache.commons.lang.exception.ExceptionUtils | ||
|
||
import org.apache.hive.service.cli.TableSchema | ||
|
||
import org.apache.hadoop.hive.ql.Driver | ||
import org.apache.hadoop.hive.metastore.api.Schema | ||
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse | ||
|
||
import org.apache.spark.sql.hive.CatalystContext | ||
|
||
class CatalystDriver(val context: CatalystContext = CatalystEnv.cc) extends Driver with LogHelper { | ||
private var tschema: TableSchema = _ | ||
private var result: (Int, Seq[String], Throwable) = _ | ||
|
||
override def init(): Unit = { | ||
} | ||
|
||
override def run(command: String): CommandProcessorResponse = { | ||
val execution = new context.HiveQLQueryExecution(command) | ||
|
||
// TODO unify the error code | ||
try { | ||
result = execution.result | ||
tschema = execution.getResultSetSchema | ||
|
||
if(result._1 != 0) { | ||
logError(s"Failed in [$command]", result._3) | ||
new CommandProcessorResponse(result._1, ExceptionUtils.getFullStackTrace(result._3), null) | ||
} else { | ||
new CommandProcessorResponse(result._1) | ||
} | ||
} catch { | ||
case t: Throwable => | ||
logError(s"Failed in [$command]", t) | ||
new CommandProcessorResponse(-3, ExceptionUtils.getFullStackTrace(t), null) | ||
} | ||
} | ||
|
||
override def close(): Int = { | ||
result = null | ||
tschema = null | ||
|
||
0 | ||
} | ||
|
||
/** | ||
* Get the result schema, currently CatalystDriver doesn't support it yet. | ||
* TODO: the TableSchema (org.apache.hive.service.cli.TableSchema) is returned by Catalyst, | ||
* however, the Driver requires the Schema (org.apache.hadoop.hive.metastore.api.Schema) | ||
* Need to figure out how to convert the previous to later. | ||
*/ | ||
override def getSchema(): Schema = throw new UnsupportedOperationException("for getSchema") | ||
def getTableSchema = tschema | ||
|
||
override def getResults(res: ArrayList[String]): Boolean = { | ||
if(result == null) { | ||
false | ||
} else { | ||
res.addAll(result._2) | ||
result = null | ||
true | ||
} | ||
} | ||
|
||
override def destroy() { | ||
result = null | ||
tschema = null | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at least some of the issues that necessitate this class's existence have been fixed (e.g. EXPLAIN throwing exceptions). I'm fine with leaving these other fixes here for now, but can you file some JIRAs for the ones that aren't fixed in Spark?