Skip to content

Commit

Permalink
Merge pull request #339 from shimingfei/tachyon-table-write
Browse files Browse the repository at this point in the history
FileAlreadyExistException when writing tachyon table
  • Loading branch information
rxin committed Oct 2, 2014
2 parents 401fca0 + d159007 commit 662d5ba
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 31 deletions.
2 changes: 1 addition & 1 deletion project/SharkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object SharkBuild extends Build {

// Whether to build Shark with Tachyon jar.
val TACHYON_ENABLED = true
val TACHYON_VERSION = "0.4.1"
val TACHYON_VERSION = "0.5.0"

lazy val root = Project(
id = "root",
Expand Down
10 changes: 10 additions & 0 deletions src/main/scala/shark/SharkConfVars.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ object SharkConfVars {
// Number of mappers to force for table scan jobs
val NUM_MAPPERS = new ConfVar("shark.map.tasks", -1)

// WriteType for Tachyon off-heap table writer,e.g., "TRY_CACHE", "MUST_CACHE",
// "CACHE_THROUGH", "THROUGH".
// For the reliability concern, we strongly recommend to use the default "CACHE_THROUGH",
// which means to write the table synchronously to the under fs, and cache the host columns.
// Both "TRY_CACHE" and "MUST_CACHE" options only cache the table with better write
// performance. However be careful to use those two options! If the entire table
// cannot be fully cached, some data part will be evicted and lost forever.
// "THROUGH" only writes the table to under fs and with no cache at all.
val TACHYON_WRITER_WRITETYPE = new ConfVar("shark.tachyon.writetype", "CACHE_THROUGH")

// Add Shark configuration variables and their default values to the given conf,
// so default values show up in 'set'.
def initializeWithDefaults(conf: Configuration) {
Expand Down
41 changes: 30 additions & 11 deletions src/main/scala/shark/execution/MemoryStoreSinkOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.reflect.BeanProperty

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.io.Writable

import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.{TaskContext, SerializableWritable}

import shark.{SharkConfVars, SharkEnv}
import shark.execution.serialization.{OperatorSerializationWrapper, JavaSerializer}
import shark.memstore2._


/**
* Cache the RDD and force evaluate it (so the cache is filled).
*/
Expand Down Expand Up @@ -129,16 +131,12 @@ class MemoryStoreSinkOperator extends TerminalOperator {
// Put the table in off-heap storage.
op.logInfo("Putting RDD for %s.%s in off-heap storage".format(databaseName, tableName))
offHeapWriter.createTable()
outputRDD = outputRDD.mapPartitionsWithIndex { case(part, iter) =>
val partition = iter.next()
partition.toOffHeap.zipWithIndex.foreach { case(buf, column) =>
offHeapWriter.writeColumnPartition(column, part, buf)
}
Iterator(partition)
}
// Force evaluate so the data gets put into off-heap storage.
val broadcastedHiveConf
= outputRDD.context.broadcast(new SerializableWritable(op.getLocalHconf))
outputRDD.context.runJob(
outputRDD, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit))
outputRDD,
MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter, broadcastedHiveConf))
offHeapWriter.cleanTmpPath()
} else {
// Run a job on the RDD that contains the query output to force the data into the memory
// store. The statistics will also be collected by 'statsAcc' during job execution.
Expand Down Expand Up @@ -208,3 +206,24 @@ class MemoryStoreSinkOperator extends TerminalOperator {
override def processPartition(split: Int, iter: Iterator[_]): Iterator[_] =
throw new UnsupportedOperationException("CacheSinkOperator.processPartition()")
}

object MemoryStoreSinkOperator {
def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter,
broadcastedHiveConf: Broadcast[SerializableWritable[HiveConf]]) = {
def writeFiles(context: TaskContext, iter: Iterator[_]): Long = {
val partId = context.partitionId
val partition = iter.next().asInstanceOf[TablePartition]
val taskTmpDir = context.stageId + "_" + context.partitionId + "_" + context.attemptId
var writeBytes: Long = 0
partition.toOffHeap.zipWithIndex.foreach { case(buf, column) =>
offHeapWriter.setLocalHconf(broadcastedHiveConf.value.value)
offHeapWriter.writePartitionColumn(partId, column, buf, taskTmpDir)
writeBytes += buf.limit
}
val numColumns = partition.columns.size + 1
offHeapWriter.commitPartition(partId, numColumns, taskTmpDir)
writeBytes
}
writeFiles _
}
}
14 changes: 5 additions & 9 deletions src/main/scala/shark/execution/SparkLoadTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,14 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe
OffHeapStorageClient.client.dropTablePartition(tableKey, hivePartitionKeyOpt)
}
offHeapWriter.createTable()
transformedRdd = transformedRdd.mapPartitionsWithIndex { case(part, iter) =>
val partition = iter.next()
partition.toOffHeap.zipWithIndex.foreach { case(buf, column) =>
offHeapWriter.writeColumnPartition(column, part, buf)
}
Iterator(partition)
}
transformedRdd.context.runJob(
transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter,
broadcastedHiveConf))
} else {
transformedRdd.persist(StorageLevel.MEMORY_AND_DISK)
transformedRdd.context.runJob(
transformedRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit))
}
transformedRdd.context.runJob(
transformedRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit))
if (work.cacheMode == CacheType.OFFHEAP) {
offHeapWriter.setStats(statsAcc.value.toMap)
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/scala/shark/memstore2/OffHeapStorageClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package shark.memstore2
import java.util
import java.nio.ByteBuffer

import scala.reflect.BeanProperty

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.rdd.RDD

import shark.LogHelper
Expand Down Expand Up @@ -67,15 +70,20 @@ abstract class OffHeapStorageClient {
}

abstract class OffHeapTableWriter extends Serializable {
@transient @BeanProperty var localHconf: HiveConf = _

/** Creates this table. Called only on the driver node. */
def createTable()

/** Sets stats on this table. Called only on the driver node. */
def setStats(indexToStats: collection.Map[Int, TablePartitionStats])

/** Write the data of a partition of a given column. Called only on worker nodes. */
def writeColumnPartition(column: Int, part: Int, data: ByteBuffer)
/** Write the data of a partition of a given column. Called only on worker nodes. */
def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String)

def commitPartition(part: Int, numColumns: Int, tempDir: String)

def cleanTmpPath()
}

/** Responsible for creating OffHeapStorageClients. Will be called on master and worker nodes. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ package shark.tachyon

import java.nio.ByteBuffer

import tachyon.client.WriteType
import scala.reflect.BeanProperty

import shark.LogHelper
import shark.{LogHelper, SharkConfVars}
import shark.execution.serialization.JavaSerializer
import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats}

import tachyon.client.WriteType
import tachyon.master.MasterInfo
import tachyon.util.CommonUtils

class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: Int)
extends OffHeapTableWriter with LogHelper {

// Re-instantiated upon deserialization, the first time it's referenced.
@transient lazy val tfs = OffHeapStorageClient.client.asInstanceOf[TachyonStorageClient].tfs

val TEMP = "_temperary"
var rawTableId: Int = -1

override def createTable() {
Expand All @@ -47,12 +51,29 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns:
// This is only used on worker nodes.
@transient lazy val rawTable = tfs.getRawTable(rawTableId)

override def writeColumnPartition(column: Int, part: Int, data: ByteBuffer) {
val rawColumn = rawTable.getRawColumn(column)
rawColumn.createPartition(part)
val file = rawColumn.getPartition(part)
val outStream = file.getOutStream(WriteType.CACHE_THROUGH)
override def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) {
val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP)
val fid = tfs.createFile(CommonUtils.concat(tmpPath, tempDir, column + "", part + ""))
val file = tfs.getFile(fid)
val writeType: WriteType = WriteType.valueOf(
SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE))
val outStream = file.getOutStream(writeType)
outStream.write(data.array(), 0, data.limit())
outStream.close()
}

override def commitPartition(part: Int, numColumns: Int, tempDir: String) {
val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP)
(0 until numColumns).reverse.foreach { column =>
val srcPath = CommonUtils.concat(tmpPath, tempDir, column + "", part + "")
val destPath = CommonUtils.concat(rawTable.getPath(), MasterInfo.COL, column + "", part + "")
tfs.rename(srcPath, destPath)
}
tfs.delete(CommonUtils.concat(tmpPath, tempDir), true)
}

override def cleanTmpPath() {
val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP)
tfs.delete(tmpPath, true)
}
}

0 comments on commit 662d5ba

Please sign in to comment.