-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ignoreCorruptFiles feature for Parquet reader [databricks] #4742
Conversation
Signed-off-by: Bobby Wang <wbo4958@gmail.com>
build |
We still need to do the same thing for text-based file formats like ORC/JSON |
build |
CI failed on Databricks env. will check it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's quite a bit missing from this PR. It currently only addresses Parquet, but there are many other readers that need to support this config setting (or preclude being on the GPU if it is set). In addition, the error handling only covers the small section of the read where we are examining the footer, but we could get I/O errors or other exceptions when trying to read the row group data, decode it on the GPU, etc.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala
Outdated
Show resolved
Hide resolved
Thx Jason. I have updated the Description #4742 (comment) and submitted the new commit. It has considered the situation reading the footer and copying row group, but it didn't catch the exception when CUDF failed to decode the buffer. The exception CUDF failed to decode may be caused by CUDF code itself, so I tend to throw this exception. And as to other readers like ORC/JSON, I'd like to have some follow ups. |
The same "but it might be a bug in the code" logic applies whether we're talking about cudf or the Spark/Parquet code. I'm OK with addressing this in a followup because it's a bit tricky to do (and may require some assistance within cudf), but we need to file a tracking issue for it since this isn't completely covered.
That's fine, but then please file the tracking issues and link to the original issue (or a new epic issue per discussion below). This PR is marked as closing the original issue, so please either unlink the issue as we will not properly support ignoring corrupt files with just this PR or file a separate issue (maybe an epic) to track fully implementing this feature. |
Yeah, It's kind of tricky, but I don' have more ideas on how to do that. I think it's really not reasonable to consider a "correct" file into a corrupted file caused by cudf/spark-rapids code itself. Hi @jlowe, I'd like to hear your opinions. |
build |
After discussion with @GaryShen2008, we think we should fall back coalescing reading to cloud reading when ignoreCorruptedFile is set. Coalescing reading is trying hard to coalesce many parquet files at a time. If one row group of one file is corrupted, we will ignore the whole coalesced files if we decide to try-catch cudf decoding. While PERFILE/Multithread readings only ignore the corrupted file. |
Nice catch!
As I said before, there's not much difference between the argument of, "oh no, there might be a bug in cudf/spark-rapids" vs. "oh no, there might be a bug in Apache Spark." The point is, we need to implement the feature to the best of our ability, as Spark has as well. This feature is not fully implemented until that is done, so if we're not doing it here, we need to file a followup issue to complete that work (and also the requested followup issues for all the other input formats that will not be addressed here). |
build |
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall the code looks OK for Parquet, but there still needs to be followup issues for ORC, CSV, JSON, etc. Also it looks like Databricks may not support ignoreCorruptFiles
, as the test failed on the CPU, not the GPU. We may need to skip that test on Databricks.
Yeah, just tested on databricks env, which will throw exception on CPU. But with this PR, the corrupt files will be ignored when ignoreCorruptFiles is set to true. |
build |
build |
To close #4727.
GPU parquet reader will throw an exception when reading some corrupt parquet files even if the user has set
"spark.sql.files.ignoreCorruptFiles"
to true forcoalescing
ormulti-threaded
reading. WhilePERFILE
reading is ok since it is in the wrapper ofFilePartitionReader
which has handled the exceptions. see https://github.com/apache/spark/blob/branch-3.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala#L39-L59For multi-threaded reading, when detecting the
wrong footer
orfailing to copy the files
into the HostMemoryBuffer, it will totally ignore the whole file. For example, assuming there are 3 row groups, the multi-threaded reader has successfully copied the first 2 row groups into the HostMemoryBuffer, and it failed to copy the 3rd row group caused by IOException. The multi-threaded reader will ignore the first loaded 2 row groups, return empty HostMemoryBuffer. But for CPU, it may have returned the first 2 row groups to users.For coalescing reading, we have first calculated the offset and size for each file and launched multi-threads to copy the row groups into the
fixed location
of the HostMemoryBuffer. Assuming we're coalescing 1.parquet, 2.parquet and 3.parquet files. First, we have allocated a total HostMemoryBuffer for 1.parquet, 2.parquet and 3.parquet and launched 3 threads to copy 1.parquet, 2.parquet and 3.parquet into the total HostMemoryBuffer separately. Asssuming the 2.parquet is failed to copying or something. Then the re-generated parquet file will only contain 1.parquet, empty hole (for 2.parquet, but failed to copy, we just left the empty hole), 3.parquetThis PR only considers the exceptions that happened in reading footer and copying row groups. It didn't catch the exception when decoding in GPU. So, even if ignoreCorruptFiles is enabled, it will throw an exception when cudf failed to decode the host buffer.