diff --git a/project/SharkBuild.scala b/project/SharkBuild.scala index 45a35042..c1f06d9a 100755 --- a/project/SharkBuild.scala +++ b/project/SharkBuild.scala @@ -56,7 +56,7 @@ object SharkBuild extends Build { // Whether to build Shark with Tachyon jar. val TACHYON_ENABLED = true - val TACHYON_VERSION = "0.4.1" + val TACHYON_VERSION = "0.5.0" lazy val root = Project( id = "root", diff --git a/src/main/scala/shark/SharkConfVars.scala b/src/main/scala/shark/SharkConfVars.scala index b5592024..d2fe00f2 100755 --- a/src/main/scala/shark/SharkConfVars.scala +++ b/src/main/scala/shark/SharkConfVars.scala @@ -57,6 +57,16 @@ object SharkConfVars { // Number of mappers to force for table scan jobs val NUM_MAPPERS = new ConfVar("shark.map.tasks", -1) + // WriteType for Tachyon off-heap table writer,e.g., "TRY_CACHE", "MUST_CACHE", + // "CACHE_THROUGH", "THROUGH". + // For the reliability concern, we strongly recommend to use the default "CACHE_THROUGH", + // which means to write the table synchronously to the under fs, and cache the host columns. + // Both "TRY_CACHE" and "MUST_CACHE" options only cache the table with better write + // performance. However be careful to use those two options! If the entire table + // cannot be fully cached, some data part will be evicted and lost forever. + // "THROUGH" only writes the table to under fs and with no cache at all. + val TACHYON_WRITER_WRITETYPE = new ConfVar("shark.tachyon.writetype", "CACHE_THROUGH") + // Add Shark configuration variables and their default values to the given conf, // so default values show up in 'set'. def initializeWithDefaults(conf: Configuration) { diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index 2d44fbf9..956b2d27 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -22,16 +22,18 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.reflect.BeanProperty +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.io.Writable -import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.storage.StorageLevel +import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.{TaskContext, SerializableWritable} import shark.{SharkConfVars, SharkEnv} import shark.execution.serialization.{OperatorSerializationWrapper, JavaSerializer} import shark.memstore2._ - /** * Cache the RDD and force evaluate it (so the cache is filled). */ @@ -129,16 +131,12 @@ class MemoryStoreSinkOperator extends TerminalOperator { // Put the table in off-heap storage. op.logInfo("Putting RDD for %s.%s in off-heap storage".format(databaseName, tableName)) offHeapWriter.createTable() - outputRDD = outputRDD.mapPartitionsWithIndex { case(part, iter) => - val partition = iter.next() - partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => - offHeapWriter.writeColumnPartition(column, part, buf) - } - Iterator(partition) - } - // Force evaluate so the data gets put into off-heap storage. + val broadcastedHiveConf + = outputRDD.context.broadcast(new SerializableWritable(op.getLocalHconf)) outputRDD.context.runJob( - outputRDD, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit)) + outputRDD, + MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter, broadcastedHiveConf)) + offHeapWriter.cleanTmpPath() } else { // Run a job on the RDD that contains the query output to force the data into the memory // store. The statistics will also be collected by 'statsAcc' during job execution. @@ -208,3 +206,24 @@ class MemoryStoreSinkOperator extends TerminalOperator { override def processPartition(split: Int, iter: Iterator[_]): Iterator[_] = throw new UnsupportedOperationException("CacheSinkOperator.processPartition()") } + +object MemoryStoreSinkOperator { + def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter, + broadcastedHiveConf: Broadcast[SerializableWritable[HiveConf]]) = { + def writeFiles(context: TaskContext, iter: Iterator[_]): Long = { + val partId = context.partitionId + val partition = iter.next().asInstanceOf[TablePartition] + val taskTmpDir = context.stageId + "_" + context.partitionId + "_" + context.attemptId + var writeBytes: Long = 0 + partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => + offHeapWriter.setLocalHconf(broadcastedHiveConf.value.value) + offHeapWriter.writePartitionColumn(partId, column, buf, taskTmpDir) + writeBytes += buf.limit + } + val numColumns = partition.columns.size + 1 + offHeapWriter.commitPartition(partId, numColumns, taskTmpDir) + writeBytes + } + writeFiles _ + } +} diff --git a/src/main/scala/shark/execution/SparkLoadTask.scala b/src/main/scala/shark/execution/SparkLoadTask.scala index 64ae4567..f3b4527a 100644 --- a/src/main/scala/shark/execution/SparkLoadTask.scala +++ b/src/main/scala/shark/execution/SparkLoadTask.scala @@ -231,18 +231,14 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe OffHeapStorageClient.client.dropTablePartition(tableKey, hivePartitionKeyOpt) } offHeapWriter.createTable() - transformedRdd = transformedRdd.mapPartitionsWithIndex { case(part, iter) => - val partition = iter.next() - partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => - offHeapWriter.writeColumnPartition(column, part, buf) - } - Iterator(partition) - } + transformedRdd.context.runJob( + transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter, + broadcastedHiveConf)) } else { transformedRdd.persist(StorageLevel.MEMORY_AND_DISK) + transformedRdd.context.runJob( + transformedRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit)) } - transformedRdd.context.runJob( - transformedRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit)) if (work.cacheMode == CacheType.OFFHEAP) { offHeapWriter.setStats(statsAcc.value.toMap) } diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 9193b2c2..364fe89c 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -20,6 +20,9 @@ package shark.memstore2 import java.util import java.nio.ByteBuffer +import scala.reflect.BeanProperty + +import org.apache.hadoop.hive.conf.HiveConf import org.apache.spark.rdd.RDD import shark.LogHelper @@ -67,6 +70,7 @@ abstract class OffHeapStorageClient { } abstract class OffHeapTableWriter extends Serializable { + @transient @BeanProperty var localHconf: HiveConf = _ /** Creates this table. Called only on the driver node. */ def createTable() @@ -74,8 +78,12 @@ abstract class OffHeapTableWriter extends Serializable { /** Sets stats on this table. Called only on the driver node. */ def setStats(indexToStats: collection.Map[Int, TablePartitionStats]) - /** Write the data of a partition of a given column. Called only on worker nodes. */ - def writeColumnPartition(column: Int, part: Int, data: ByteBuffer) + /** Write the data of a partition of a given column. Called only on worker nodes. */ + def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) + + def commitPartition(part: Int, numColumns: Int, tempDir: String) + + def cleanTmpPath() } /** Responsible for creating OffHeapStorageClients. Will be called on master and worker nodes. */ diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index 5a1288b0..04c45563 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -19,18 +19,22 @@ package shark.tachyon import java.nio.ByteBuffer -import tachyon.client.WriteType +import scala.reflect.BeanProperty -import shark.LogHelper +import shark.{LogHelper, SharkConfVars} import shark.execution.serialization.JavaSerializer import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats} +import tachyon.client.WriteType +import tachyon.master.MasterInfo +import tachyon.util.CommonUtils + class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: Int) extends OffHeapTableWriter with LogHelper { // Re-instantiated upon deserialization, the first time it's referenced. @transient lazy val tfs = OffHeapStorageClient.client.asInstanceOf[TachyonStorageClient].tfs - + val TEMP = "_temperary" var rawTableId: Int = -1 override def createTable() { @@ -47,12 +51,29 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: // This is only used on worker nodes. @transient lazy val rawTable = tfs.getRawTable(rawTableId) - override def writeColumnPartition(column: Int, part: Int, data: ByteBuffer) { - val rawColumn = rawTable.getRawColumn(column) - rawColumn.createPartition(part) - val file = rawColumn.getPartition(part) - val outStream = file.getOutStream(WriteType.CACHE_THROUGH) + override def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) { + val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP) + val fid = tfs.createFile(CommonUtils.concat(tmpPath, tempDir, column + "", part + "")) + val file = tfs.getFile(fid) + val writeType: WriteType = WriteType.valueOf( + SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE)) + val outStream = file.getOutStream(writeType) outStream.write(data.array(), 0, data.limit()) outStream.close() } + + override def commitPartition(part: Int, numColumns: Int, tempDir: String) { + val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP) + (0 until numColumns).reverse.foreach { column => + val srcPath = CommonUtils.concat(tmpPath, tempDir, column + "", part + "") + val destPath = CommonUtils.concat(rawTable.getPath(), MasterInfo.COL, column + "", part + "") + tfs.rename(srcPath, destPath) + } + tfs.delete(CommonUtils.concat(tmpPath, tempDir), true) + } + + override def cleanTmpPath() { + val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP) + tfs.delete(tmpPath, true) + } }