Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support coalescing reading for avro #5306

Merged
merged 9 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 19 additions & 23 deletions integration_tests/src/main/python/avro_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,22 @@

rapids_reader_types = ['PERFILE', 'COALESCING']

# 50 files for the coalescing reading case
coalescingPartitionNum = 50

def gen_avro_files(gen_list, out_path):
with_cpu_session(
lambda spark: gen_df(spark,
gen_list).repartition(coalescingPartitionNum).write.format("avro").save(out_path)
)


@pytest.mark.parametrize('v1_enabled_list', ["avro", ""], ids=["v1", "v2"])
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_basic_read(spark_tmp_path, v1_enabled_list, reader_type):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)]
data_path = spark_tmp_path + '/AVRO_DATA'
# 50 files for the coalescing reading case
with_cpu_session(
lambda spark: gen_df(spark, gen_list).repartition(50).write.format("avro").save(data_path)
)
gen_avro_files(gen_list, data_path)

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.rapids.sql.format.avro.reader.type': reader_type,
Expand All @@ -54,20 +61,11 @@ def test_basic_read(spark_tmp_path, v1_enabled_list, reader_type):
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_type):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)]
first_data_path = spark_tmp_path + '/AVRO_DATA/key=0/key2=20'
with_cpu_session(
lambda spark: gen_df(spark,
gen_list).repartition(50).write.format("avro").save(first_data_path))
second_data_path = spark_tmp_path + '/AVRO_DATA/key=1/key2=21'
with_cpu_session(
lambda spark: gen_df(spark,
gen_list).repartition(50).write.format("avro").save(second_data_path))
third_data_path = spark_tmp_path + '/AVRO_DATA/key=2/key2=22'
with_cpu_session(
lambda spark: gen_df(spark,
gen_list).repartition(50).write.format("avro").save(third_data_path))

data_path = spark_tmp_path + '/AVRO_DATA'
# generate partitioned files
for v in [0, 1, 2]:
out_path = data_path + '/key={}/key2=2{}'.format(v, v)
gen_avro_files(gen_list, out_path)

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.rapids.sql.format.avro.reader.type': reader_type,
Expand All @@ -80,13 +78,11 @@ def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_ty
@pytest.mark.parametrize('v1_enabled_list', ["", "avro"], ids=["v1", "v2"])
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_avro_input_meta(spark_tmp_path, v1_enabled_list, reader_type):
first_data_path = spark_tmp_path + '/AVRO_DATA/key=0'
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(first_data_path))
second_data_path = spark_tmp_path + '/AVRO_DATA/key=1'
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(second_data_path))
data_path = spark_tmp_path + '/AVRO_DATA'
for v in [0, 1]:
out_path = data_path + '/key={}'.format(v)
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(out_path))

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.rapids.sql.format.avro.reader.type': reader_type,
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,12 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
2 changes: 0 additions & 2 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.nio.charset.StandardCharsets
import scala.collection.mutable
jlowe marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.avro.Schema
import org.apache.avro.file.{DataFileConstants, SeekableInput}
import org.apache.avro.file.DataFileConstants._
import org.apache.avro.file.SeekableInput
import org.apache.avro.io.{BinaryData, BinaryDecoder, DecoderFactory}
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}

Expand Down Expand Up @@ -61,22 +62,17 @@ private class SeekableInputStream(in: SeekableInput) extends InputStream with Se
/**
* The header information of an Avro file.
*/
class Header private[rapids] {
private[rapids] val meta = mutable.Map[String, Array[Byte]]()
private[rapids] val sync = new Array[Byte](DataFileConstants.SYNC_SIZE)
private[rapids] var headerSize: Option[Long] = None
case class Header(
meta: Map[String, Array[Byte]],
// Array in scala is mutable, so keep it private to avoid unexpected update.
private val syncBuffer: Array[Byte]) {

def firstBlockStart: Long = headerSize.getOrElse {
val out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM)
AvroFileWriter(out).writeHeader(this)
val newSize = out.getByteCount
headerSize = Some(newSize)
newSize
}
/** Get a copy of the sync marker. */
def sync: Array[Byte] = syncBuffer.clone

@transient
lazy val schema: Schema = {
getMetaString(DataFileConstants.SCHEMA)
getMetaString(SCHEMA)
.map(s => new Schema.Parser().setValidateDefaults(false).setValidate(false).parse(s))
.orNull
}
Expand All @@ -87,39 +83,38 @@ class Header private[rapids] {
}

object Header {
/** Compute header size in bytes for serialization */
def headerSizeInBytes(h: Header): Long = {
val out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM)
AvroFileWriter(out).writeHeader(h)
out.getByteCount
}

/**
* Merge the metadata of the given headers.
* Note: It does not check the compatibility of the headers.
* @param headers whose metadata to be merged.
* @return the first header but having the new merged metadata, or
* None if the input is empty.
* @return a header with the new merged metadata and the first header's
* sync marker, or None if the input is empty.
*/
def mergeMetadata(headers: Seq[Header]): Option[Header] = {
if (headers.isEmpty) {
None
} else if (headers.size == 1) {
Some(headers.head)
} else {
val mergedHeader = headers.reduce { (merged, h) =>
merged.meta ++= h.meta
merged
val mergedMeta = headers.map(_.meta).reduce { (merged, meta) =>
merged ++ meta
}
// need to re-compute the header size
mergedHeader.headerSize = None
Some(mergedHeader)
Some(Header(mergedMeta, headers.head.sync))
}
}

/** Test whether the two headers have the same sync marker */
def hasSameSync(h1: Header, h2: Header): Boolean = h1.sync.sameElements(h2.sync)

/**
* Test whether the two headers have conflicts in the metadata.
* A conflict means a key exists in both of the two headers' metadata,
* and maps to different values.
*/
def hasConflictInMetadata(h1: Header, h2: Header): Boolean = h1.meta.exists {
case (k, v) => h2.meta.contains(k) && !h2.meta.get(k).get.sameElements(v)
case (k, v) => h2.meta.get(k).exists(_.sameElements(v))
}
}

Expand All @@ -140,29 +135,36 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable {
private val sin = new SeekableInputStream(si)
sin.seek(0) // seek to the start of file and get some meta info.
private var vin: BinaryDecoder = DecoderFactory.get.binaryDecoder(sin, vin);
private val header: Header = new Header()
private var header: Header = null
jlowe marked this conversation as resolved.
Show resolved Hide resolved
private var firstBlockStart: Long = 0

// store all blocks info
private val blocks: mutable.ArrayBuffer[BlockInfo] = mutable.ArrayBuffer.empty
private var blocks: Option[Seq[BlockInfo]] = None
jlowe marked this conversation as resolved.
Show resolved Hide resolved

initialize()

def getBlocks(): Seq[BlockInfo] = blocks.toSeq

def getHeader(): Header = header

private def initialize() = {
val magic = new Array[Byte](DataFileConstants.MAGIC.length)
vin.readFixed(magic)
def getHeaderSize(): Long = firstBlockStart

def getBlocks(): Seq[BlockInfo] = blocks.getOrElse {
val b = parseBlocks()
blocks = Some(b)
b
}

private def initialize(): Unit = {
// read magic
val magic = new Array[Byte](MAGIC.length)
vin.readFixed(magic)
magic match {
case Array(79, 98, 106, 1) => // current avro format
case Array(79, 98, 106, 0) => // old format
throw new UnsupportedOperationException("avro 1.2 format is not support by GPU")
case _ => throw new RuntimeException("Not an Avro data file.")
}

// read metadata map
val meta = mutable.Map[String, Array[Byte]]()
var l = vin.readMapStart().toInt
if (l > 0) {
do {
Expand All @@ -171,34 +173,36 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable {
val value = vin.readBytes(null)
val bb = new Array[Byte](value.remaining())
value.get(bb)
header.meta += (key -> bb)
meta += (key -> bb)
}
l = vin.mapNext().toInt
} while (l != 0)
}
vin.readFixed(header.sync)
firstBlockStart = sin.tell - vin.inputStream.available // get the first block Start address
header.headerSize = Some(firstBlockStart)
parseBlocks()
// read sync marker
val sync = new Array[Byte](SYNC_SIZE)
vin.readFixed(sync)
header = Header(meta.toMap, sync)
firstBlockStart = sin.tell - vin.inputStream.available
}

private def seek(position: Long): Unit = {
sin.seek(position)
vin = DecoderFactory.get().binaryDecoder(this.sin, vin);
}

private def parseBlocks(): Unit = {
private def parseBlocks(): Seq[BlockInfo] = {
if (firstBlockStart >= sin.length() || vin.isEnd()) {
// no blocks
return
return Seq.empty
}
val blocks = mutable.ArrayBuffer.empty[BlockInfo]
// buf is used for writing long
val buf = new Array[Byte](12)
var blockStart = firstBlockStart
while (blockStart < sin.length()) {
seek(blockStart)
if (vin.isEnd()) {
return
return blocks.toSeq
}
val blockCount = vin.readLong()
val blockDataSize = vin.readLong()
Expand All @@ -211,13 +215,13 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable {
val blockDataSizeLen: Int = BinaryData.encodeLong(blockDataSize, buf, 0)

// (len of entries) + (len of block size) + (block size) + (sync size)
val blockLength = blockCountLen + blockDataSizeLen + blockDataSize +
DataFileConstants.SYNC_SIZE
val blockLength = blockCountLen + blockDataSizeLen + blockDataSize + SYNC_SIZE
blocks += BlockInfo(blockStart, blockLength, blockDataSize, blockCount)

// Do we need to check the SYNC BUFFER, or just let cudf do it?
blockStart += blockLength
}
blocks.toSeq
}

override def close(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(
* Write a header for a specific file format. If there is no header for the file format,
* just ignore it and return 0
*
* @param paths the paths of files to be coalcesed into a single batch
* @param paths the paths of files to be coalesced into a single batch
* @param buffer where the header will be written
* @return how many bytes written
*/
Expand Down
Loading