From 6ed51582af8c468fcef996e392a6e9c31f88dc62 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Tue, 10 Jun 2014 09:04:08 +0800 Subject: [PATCH 1/8] fix bug of failing to write to tachyon when speculation execution is on --- .../execution/MemoryStoreSinkOperator.scala | 36 +++++++++++++------ .../memstore2/OffHeapStorageClient.scala | 6 ++++ .../tachyon/TachyonOffHeapTableWriter.scala | 30 ++++++++++++++-- 3 files changed, 59 insertions(+), 13 deletions(-) diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index 2d44fbf9..b10a00c6 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -31,7 +31,7 @@ import shark.{SharkConfVars, SharkEnv} import shark.execution.serialization.{OperatorSerializationWrapper, JavaSerializer} import shark.memstore2._ - +import org.apache.spark.TaskContext /** * Cache the RDD and force evaluate it (so the cache is filled). */ @@ -80,7 +80,7 @@ class MemoryStoreSinkOperator extends TerminalOperator { localHconf.setInt(SharkConfVars.COLUMN_BUILDER_PARTITION_SIZE.varname, partitionSize) localHconf.setBoolean(SharkConfVars.COLUMNAR_COMPRESSION.varname, shouldCompress) } - + override def execute(): RDD[_] = { val inputRdd = if (parentOperators.size == 1) executeParents().head._2 else null @@ -129,16 +129,9 @@ class MemoryStoreSinkOperator extends TerminalOperator { // Put the table in off-heap storage. op.logInfo("Putting RDD for %s.%s in off-heap storage".format(databaseName, tableName)) offHeapWriter.createTable() - outputRDD = outputRDD.mapPartitionsWithIndex { case(part, iter) => - val partition = iter.next() - partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => - offHeapWriter.writeColumnPartition(column, part, buf) - } - Iterator(partition) - } - // Force evaluate so the data gets put into off-heap storage. outputRDD.context.runJob( - outputRDD, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit)) + outputRDD, MemoryStoreSinkOperator.processOffHeapSinkPartition(op, offHeapWriter)) + offHeapWriter.cleanTmpPath() } else { // Run a job on the RDD that contains the query output to force the data into the memory // store. The statistics will also be collected by 'statsAcc' during job execution. @@ -208,3 +201,24 @@ class MemoryStoreSinkOperator extends TerminalOperator { override def processPartition(split: Int, iter: Iterator[_]): Iterator[_] = throw new UnsupportedOperationException("CacheSinkOperator.processPartition()") } + +object MemoryStoreSinkOperator { + def processOffHeapSinkPartition(op: OperatorSerializationWrapper[MemoryStoreSinkOperator], + offHeapWriter: OffHeapTableWriter) = { + def writeFiles(context: TaskContext, iter: Iterator[_]): Long = { + op.logDebug("Started executing mapPartitions for operator: " + op) + val partId = context.partitionId + val partition = iter.next().asInstanceOf[TablePartition] + val taskTmpDir = context.stageId + "_" + context.partitionId + "_" + context.attemptId + var writeBytes: Long = 0 + partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => + offHeapWriter.writePartitionColumn(partId, column, buf, taskTmpDir) + writeBytes += buf.limit + } + offHeapWriter.commitPartition(partId, taskTmpDir) + op.logDebug("Finished executing mapPartitions for operator: " + op) + writeBytes + } + writeFiles _ + } +} diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 9193b2c2..2d2a62bd 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -76,6 +76,12 @@ abstract class OffHeapTableWriter extends Serializable { /** Write the data of a partition of a given column. Called only on worker nodes. */ def writeColumnPartition(column: Int, part: Int, data: ByteBuffer) + + def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) + + def commitPartition(part: Int, tempDir: String) + + def cleanTmpPath() } /** Responsible for creating OffHeapStorageClients. Will be called on master and worker nodes. */ diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index 5a1288b0..8b889981 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -20,7 +20,8 @@ package shark.tachyon import java.nio.ByteBuffer import tachyon.client.WriteType - +import tachyon.Constants +import tachyon.master.MasterInfo import shark.LogHelper import shark.execution.serialization.JavaSerializer import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats} @@ -30,7 +31,7 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: // Re-instantiated upon deserialization, the first time it's referenced. @transient lazy val tfs = OffHeapStorageClient.client.asInstanceOf[TachyonStorageClient].tfs - + val TEMP = "_temperary" var rawTableId: Int = -1 override def createTable() { @@ -55,4 +56,29 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: outStream.write(data.array(), 0, data.limit()) outStream.close() } + + override def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) { + val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP + val fid = tfs.createFile(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR + + column + Constants.PATH_SEPARATOR + part) + val file = tfs.getFile(fid) + val outStream = file.getOutStream(WriteType.CACHE_THROUGH) + outStream.write(data.array(), 0, data.limit()) + outStream.close() + } + + override def commitPartition(part: Int, tempDir: String) { + val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP + (0 until rawTable.getColumns()).foreach { column => + tfs.rename(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR + + column + Constants.PATH_SEPARATOR + part, rawTable.getPath() + Constants.PATH_SEPARATOR + + MasterInfo.COL + column + Constants.PATH_SEPARATOR + part) + } + tfs.delete(tmpPath + Constants.PATH_SEPARATOR + tempDir, true) + } + + override def cleanTmpPath() { + val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP + tfs.delete(tmpPath, true) + } } From ca758cd77f05567cf415efdab86f58e6c7e1da76 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Wed, 11 Jun 2014 16:11:35 +0800 Subject: [PATCH 2/8] change the sequence of writing files --- .../scala/shark/tachyon/TachyonOffHeapTableWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index 8b889981..4f8267d1 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -69,7 +69,7 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: override def commitPartition(part: Int, tempDir: String) { val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP - (0 until rawTable.getColumns()).foreach { column => + (0 until rawTable.getColumns()).reverse.foreach { column => tfs.rename(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR + column + Constants.PATH_SEPARATOR + part, rawTable.getPath() + Constants.PATH_SEPARATOR + MasterInfo.COL + column + Constants.PATH_SEPARATOR + part) From 062a6f2c6ad3b4a017e58f8eb4883679359811b9 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Thu, 12 Jun 2014 13:23:56 +0800 Subject: [PATCH 3/8] format modification --- src/main/scala/shark/execution/MemoryStoreSinkOperator.scala | 4 ++-- src/main/scala/shark/memstore2/OffHeapStorageClient.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index b10a00c6..e1a8eb81 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -206,7 +206,7 @@ object MemoryStoreSinkOperator { def processOffHeapSinkPartition(op: OperatorSerializationWrapper[MemoryStoreSinkOperator], offHeapWriter: OffHeapTableWriter) = { def writeFiles(context: TaskContext, iter: Iterator[_]): Long = { - op.logDebug("Started executing mapPartitions for operator: " + op) + op.logDebug("Started executing writeFiles for operator: " + op) val partId = context.partitionId val partition = iter.next().asInstanceOf[TablePartition] val taskTmpDir = context.stageId + "_" + context.partitionId + "_" + context.attemptId @@ -216,7 +216,7 @@ object MemoryStoreSinkOperator { writeBytes += buf.limit } offHeapWriter.commitPartition(partId, taskTmpDir) - op.logDebug("Finished executing mapPartitions for operator: " + op) + op.logDebug("Finished executing writeFiles for operator: " + op) writeBytes } writeFiles _ diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 2d2a62bd..230540d3 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -80,7 +80,7 @@ abstract class OffHeapTableWriter extends Serializable { def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) def commitPartition(part: Int, tempDir: String) - + def cleanTmpPath() } From ff5684169a3bcce398fc5fd53128d4b9e3e3bb8e Mon Sep 17 00:00:00 2001 From: Mingfei Date: Fri, 13 Jun 2014 09:54:40 +0800 Subject: [PATCH 4/8] solve filenotexsitexception when table has an empty partition --- .../shark/execution/MemoryStoreSinkOperator.scala | 10 ++++------ .../scala/shark/memstore2/OffHeapStorageClient.scala | 2 +- .../shark/tachyon/TachyonOffHeapTableWriter.scala | 12 +++++++----- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index e1a8eb81..0afbf043 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -130,7 +130,7 @@ class MemoryStoreSinkOperator extends TerminalOperator { op.logInfo("Putting RDD for %s.%s in off-heap storage".format(databaseName, tableName)) offHeapWriter.createTable() outputRDD.context.runJob( - outputRDD, MemoryStoreSinkOperator.processOffHeapSinkPartition(op, offHeapWriter)) + outputRDD, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter)) offHeapWriter.cleanTmpPath() } else { // Run a job on the RDD that contains the query output to force the data into the memory @@ -203,10 +203,8 @@ class MemoryStoreSinkOperator extends TerminalOperator { } object MemoryStoreSinkOperator { - def processOffHeapSinkPartition(op: OperatorSerializationWrapper[MemoryStoreSinkOperator], - offHeapWriter: OffHeapTableWriter) = { + def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter) = { def writeFiles(context: TaskContext, iter: Iterator[_]): Long = { - op.logDebug("Started executing writeFiles for operator: " + op) val partId = context.partitionId val partition = iter.next().asInstanceOf[TablePartition] val taskTmpDir = context.stageId + "_" + context.partitionId + "_" + context.attemptId @@ -215,8 +213,8 @@ object MemoryStoreSinkOperator { offHeapWriter.writePartitionColumn(partId, column, buf, taskTmpDir) writeBytes += buf.limit } - offHeapWriter.commitPartition(partId, taskTmpDir) - op.logDebug("Finished executing writeFiles for operator: " + op) + val numColumns = partition.columns.size + 1 + offHeapWriter.commitPartition(partId, numColumns, taskTmpDir) writeBytes } writeFiles _ diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 230540d3..0435de5a 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -79,7 +79,7 @@ abstract class OffHeapTableWriter extends Serializable { def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) - def commitPartition(part: Int, tempDir: String) + def commitPartition(part: Int, numColumns: Int, tempDir: String) def cleanTmpPath() } diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index 4f8267d1..d1533241 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -67,12 +67,14 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: outStream.close() } - override def commitPartition(part: Int, tempDir: String) { + override def commitPartition(part: Int, numColumns: Int, tempDir: String) { val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP - (0 until rawTable.getColumns()).reverse.foreach { column => - tfs.rename(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR - + column + Constants.PATH_SEPARATOR + part, rawTable.getPath() + Constants.PATH_SEPARATOR - + MasterInfo.COL + column + Constants.PATH_SEPARATOR + part) + (0 until numColumns).reverse.foreach { column => + val srcPath = tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR + + column + Constants.PATH_SEPARATOR + part + val destPath = rawTable.getPath() + Constants.PATH_SEPARATOR + + MasterInfo.COL + column + Constants.PATH_SEPARATOR + part + tfs.rename(srcPath, destPath) } tfs.delete(tmpPath + Constants.PATH_SEPARATOR + tempDir, true) } From 4094cab16e992b7a05af52e3f47729fc01e1eb9b Mon Sep 17 00:00:00 2001 From: Mingfei Date: Fri, 13 Jun 2014 09:58:34 +0800 Subject: [PATCH 5/8] clean code for writeColumnPartition --- src/main/scala/shark/execution/SparkLoadTask.scala | 13 ++++--------- .../shark/memstore2/OffHeapStorageClient.scala | 4 +--- .../shark/tachyon/TachyonOffHeapTableWriter.scala | 9 --------- 3 files changed, 5 insertions(+), 21 deletions(-) diff --git a/src/main/scala/shark/execution/SparkLoadTask.scala b/src/main/scala/shark/execution/SparkLoadTask.scala index 64ae4567..0138c941 100644 --- a/src/main/scala/shark/execution/SparkLoadTask.scala +++ b/src/main/scala/shark/execution/SparkLoadTask.scala @@ -231,18 +231,13 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe OffHeapStorageClient.client.dropTablePartition(tableKey, hivePartitionKeyOpt) } offHeapWriter.createTable() - transformedRdd = transformedRdd.mapPartitionsWithIndex { case(part, iter) => - val partition = iter.next() - partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => - offHeapWriter.writeColumnPartition(column, part, buf) - } - Iterator(partition) - } + transformedRdd.context.runJob( + transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter)) } else { transformedRdd.persist(StorageLevel.MEMORY_AND_DISK) + transformedRdd.context.runJob( + transformedRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit)) } - transformedRdd.context.runJob( - transformedRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit)) if (work.cacheMode == CacheType.OFFHEAP) { offHeapWriter.setStats(statsAcc.value.toMap) } diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 0435de5a..082ac651 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -74,9 +74,7 @@ abstract class OffHeapTableWriter extends Serializable { /** Sets stats on this table. Called only on the driver node. */ def setStats(indexToStats: collection.Map[Int, TablePartitionStats]) - /** Write the data of a partition of a given column. Called only on worker nodes. */ - def writeColumnPartition(column: Int, part: Int, data: ByteBuffer) - + /** Write the data of a partition of a given column. Called only on worker nodes. */ def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) def commitPartition(part: Int, numColumns: Int, tempDir: String) diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index d1533241..85a8d125 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -48,15 +48,6 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: // This is only used on worker nodes. @transient lazy val rawTable = tfs.getRawTable(rawTableId) - override def writeColumnPartition(column: Int, part: Int, data: ByteBuffer) { - val rawColumn = rawTable.getRawColumn(column) - rawColumn.createPartition(part) - val file = rawColumn.getPartition(part) - val outStream = file.getOutStream(WriteType.CACHE_THROUGH) - outStream.write(data.array(), 0, data.limit()) - outStream.close() - } - override def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) { val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP val fid = tfs.createFile(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR From 579f6ea52613e40e83c4b2d8e532b858d93abdc4 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Wed, 3 Sep 2014 12:01:44 +0800 Subject: [PATCH 6/8] merge PR327 --- src/main/scala/shark/SharkConfVars.scala | 10 ++++++++++ .../execution/MemoryStoreSinkOperator.scala | 17 ++++++++++------- .../scala/shark/execution/SparkLoadTask.scala | 3 ++- .../shark/memstore2/OffHeapStorageClient.scala | 4 ++++ .../tachyon/TachyonOffHeapTableWriter.scala | 13 +++++++++---- 5 files changed, 35 insertions(+), 12 deletions(-) diff --git a/src/main/scala/shark/SharkConfVars.scala b/src/main/scala/shark/SharkConfVars.scala index b5592024..d2fe00f2 100755 --- a/src/main/scala/shark/SharkConfVars.scala +++ b/src/main/scala/shark/SharkConfVars.scala @@ -57,6 +57,16 @@ object SharkConfVars { // Number of mappers to force for table scan jobs val NUM_MAPPERS = new ConfVar("shark.map.tasks", -1) + // WriteType for Tachyon off-heap table writer,e.g., "TRY_CACHE", "MUST_CACHE", + // "CACHE_THROUGH", "THROUGH". + // For the reliability concern, we strongly recommend to use the default "CACHE_THROUGH", + // which means to write the table synchronously to the under fs, and cache the host columns. + // Both "TRY_CACHE" and "MUST_CACHE" options only cache the table with better write + // performance. However be careful to use those two options! If the entire table + // cannot be fully cached, some data part will be evicted and lost forever. + // "THROUGH" only writes the table to under fs and with no cache at all. + val TACHYON_WRITER_WRITETYPE = new ConfVar("shark.tachyon.writetype", "CACHE_THROUGH") + // Add Shark configuration variables and their default values to the given conf, // so default values show up in 'set'. def initializeWithDefaults(conf: Configuration) { diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index 0afbf043..092c3413 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -18,20 +18,18 @@ package shark.execution import java.nio.ByteBuffer - import scala.collection.mutable.ArrayBuffer import scala.reflect.BeanProperty - +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.io.Writable - import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.storage.StorageLevel - import shark.{SharkConfVars, SharkEnv} import shark.execution.serialization.{OperatorSerializationWrapper, JavaSerializer} import shark.memstore2._ - import org.apache.spark.TaskContext +import org.apache.spark.SerializableWritable +import org.apache.spark.broadcast.Broadcast /** * Cache the RDD and force evaluate it (so the cache is filled). */ @@ -129,8 +127,11 @@ class MemoryStoreSinkOperator extends TerminalOperator { // Put the table in off-heap storage. op.logInfo("Putting RDD for %s.%s in off-heap storage".format(databaseName, tableName)) offHeapWriter.createTable() + val broadcastedHiveConf + = outputRDD.context.broadcast(new SerializableWritable(op.getLocalHconf)) outputRDD.context.runJob( - outputRDD, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter)) + outputRDD, + MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter, broadcastedHiveConf)) offHeapWriter.cleanTmpPath() } else { // Run a job on the RDD that contains the query output to force the data into the memory @@ -203,13 +204,15 @@ class MemoryStoreSinkOperator extends TerminalOperator { } object MemoryStoreSinkOperator { - def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter) = { + def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter, + broadcastedHiveConf: Broadcast[SerializableWritable[HiveConf]]) = { def writeFiles(context: TaskContext, iter: Iterator[_]): Long = { val partId = context.partitionId val partition = iter.next().asInstanceOf[TablePartition] val taskTmpDir = context.stageId + "_" + context.partitionId + "_" + context.attemptId var writeBytes: Long = 0 partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => + offHeapWriter.setLocalHconf(broadcastedHiveConf.value.value) offHeapWriter.writePartitionColumn(partId, column, buf, taskTmpDir) writeBytes += buf.limit } diff --git a/src/main/scala/shark/execution/SparkLoadTask.scala b/src/main/scala/shark/execution/SparkLoadTask.scala index 0138c941..f3b4527a 100644 --- a/src/main/scala/shark/execution/SparkLoadTask.scala +++ b/src/main/scala/shark/execution/SparkLoadTask.scala @@ -232,7 +232,8 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe } offHeapWriter.createTable() transformedRdd.context.runJob( - transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter)) + transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter, + broadcastedHiveConf)) } else { transformedRdd.persist(StorageLevel.MEMORY_AND_DISK) transformedRdd.context.runJob( diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 082ac651..5648968e 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -20,6 +20,9 @@ package shark.memstore2 import java.util import java.nio.ByteBuffer +import scala.reflect.BeanProperty + +import org.apache.hadoop.hive.conf.HiveConf import org.apache.spark.rdd.RDD import shark.LogHelper @@ -67,6 +70,7 @@ abstract class OffHeapStorageClient { } abstract class OffHeapTableWriter extends Serializable { + @transient @BeanProperty var localHconf: HiveConf = _ /** Creates this table. Called only on the driver node. */ def createTable() diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index 85a8d125..776d1377 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -19,12 +19,15 @@ package shark.tachyon import java.nio.ByteBuffer +import scala.reflect.BeanProperty + +import shark.{LogHelper, SharkConfVars} +import shark.execution.serialization.JavaSerializer +import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats} + import tachyon.client.WriteType import tachyon.Constants import tachyon.master.MasterInfo -import shark.LogHelper -import shark.execution.serialization.JavaSerializer -import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats} class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: Int) extends OffHeapTableWriter with LogHelper { @@ -53,7 +56,9 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: val fid = tfs.createFile(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR + column + Constants.PATH_SEPARATOR + part) val file = tfs.getFile(fid) - val outStream = file.getOutStream(WriteType.CACHE_THROUGH) + val writeType: WriteType = WriteType.valueOf( + SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE)) + val outStream = file.getOutStream(writeType) outStream.write(data.array(), 0, data.limit()) outStream.close() } From 77684be66294a17ecebe255640b5e2c5729e3550 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Wed, 3 Sep 2014 12:02:01 +0800 Subject: [PATCH 7/8] change Tachyon version to 0.5.0 --- project/SharkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SharkBuild.scala b/project/SharkBuild.scala index 45a35042..c1f06d9a 100755 --- a/project/SharkBuild.scala +++ b/project/SharkBuild.scala @@ -56,7 +56,7 @@ object SharkBuild extends Build { // Whether to build Shark with Tachyon jar. val TACHYON_ENABLED = true - val TACHYON_VERSION = "0.4.1" + val TACHYON_VERSION = "0.5.0" lazy val root = Project( id = "root", From d1590073030e654a782aad8dd36878df8798fa48 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Wed, 3 Sep 2014 12:12:27 +0800 Subject: [PATCH 8/8] some format fix --- .../execution/MemoryStoreSinkOperator.scala | 14 +++++++---- .../memstore2/OffHeapStorageClient.scala | 4 ++-- .../tachyon/TachyonOffHeapTableWriter.scala | 23 ++++++++----------- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index 092c3413..956b2d27 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -18,18 +18,22 @@ package shark.execution import java.nio.ByteBuffer + import scala.collection.mutable.ArrayBuffer import scala.reflect.BeanProperty + import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.io.Writable -import org.apache.spark.rdd.{RDD, UnionRDD} + +import org.apache.spark.broadcast.Broadcast import org.apache.spark.storage.StorageLevel +import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.{TaskContext, SerializableWritable} + import shark.{SharkConfVars, SharkEnv} import shark.execution.serialization.{OperatorSerializationWrapper, JavaSerializer} import shark.memstore2._ -import org.apache.spark.TaskContext -import org.apache.spark.SerializableWritable -import org.apache.spark.broadcast.Broadcast + /** * Cache the RDD and force evaluate it (so the cache is filled). */ @@ -78,7 +82,7 @@ class MemoryStoreSinkOperator extends TerminalOperator { localHconf.setInt(SharkConfVars.COLUMN_BUILDER_PARTITION_SIZE.varname, partitionSize) localHconf.setBoolean(SharkConfVars.COLUMNAR_COMPRESSION.varname, shouldCompress) } - + override def execute(): RDD[_] = { val inputRdd = if (parentOperators.size == 1) executeParents().head._2 else null diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 5648968e..364fe89c 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -80,9 +80,9 @@ abstract class OffHeapTableWriter extends Serializable { /** Write the data of a partition of a given column. Called only on worker nodes. */ def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) - + def commitPartition(part: Int, numColumns: Int, tempDir: String) - + def cleanTmpPath() } diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index 776d1377..04c45563 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -26,8 +26,8 @@ import shark.execution.serialization.JavaSerializer import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats} import tachyon.client.WriteType -import tachyon.Constants import tachyon.master.MasterInfo +import tachyon.util.CommonUtils class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: Int) extends OffHeapTableWriter with LogHelper { @@ -52,9 +52,8 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: @transient lazy val rawTable = tfs.getRawTable(rawTableId) override def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) { - val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP - val fid = tfs.createFile(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR - + column + Constants.PATH_SEPARATOR + part) + val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP) + val fid = tfs.createFile(CommonUtils.concat(tmpPath, tempDir, column + "", part + "")) val file = tfs.getFile(fid) val writeType: WriteType = WriteType.valueOf( SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE)) @@ -62,21 +61,19 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: outStream.write(data.array(), 0, data.limit()) outStream.close() } - + override def commitPartition(part: Int, numColumns: Int, tempDir: String) { - val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP + val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP) (0 until numColumns).reverse.foreach { column => - val srcPath = tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR + - column + Constants.PATH_SEPARATOR + part - val destPath = rawTable.getPath() + Constants.PATH_SEPARATOR + - MasterInfo.COL + column + Constants.PATH_SEPARATOR + part + val srcPath = CommonUtils.concat(tmpPath, tempDir, column + "", part + "") + val destPath = CommonUtils.concat(rawTable.getPath(), MasterInfo.COL, column + "", part + "") tfs.rename(srcPath, destPath) } - tfs.delete(tmpPath + Constants.PATH_SEPARATOR + tempDir, true) + tfs.delete(CommonUtils.concat(tmpPath, tempDir), true) } - + override def cleanTmpPath() { - val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP + val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP) tfs.delete(tmpPath, true) } }