Skip to content

Commit

Permalink
use hadoop fs positioned-reading
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangli20 committed Sep 23, 2024
1 parent ca00a98 commit 1768bcc
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrowVersion>15.0.2</arrowVersion>
<protobufVersion>3.21.9</protobufVersion>
<hadoopClientApiVersion>3.5.1</hadoopClientApiVersion>
</properties>

<dependencyManagement>
Expand All @@ -42,6 +43,11 @@
<artifactId>spark-sql_${scalaVersion}</artifactId>
<version>${sparkVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<version>${hadoopClientApiVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions spark-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,38 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;

import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;

public class JniUtil {
public static void readFullyFromFSDataInputStream(FSDataInputStream in, long pos, ByteBuffer buf)
throws IOException {

synchronized (in) {
if (pos != in.getPos()) {
in.seek(pos);
}
ReadableByteChannel channel = Channels.newChannel(in);

try {
while (buf.hasRemaining()) {
if (channel.read(buf) == -1) {
if (((ByteBufferPositionedReadable) in).read(pos, buf) == -1) {
throw new EOFException("readFullyFromFSDataInputStream() got unexpected EOF");
}
pos += buf.position();
}

} catch (UnsupportedOperationException e) {
synchronized (in) { // failback to serialized read
while (buf.hasRemaining()) {
in.seek(pos);
if (in.read(buf) == -1) {
throw new EOFException("readFullyFromFSDataInputStream() got unexpected EOF");
}
pos += buf.position();
}
}
}
}

public static void writeFullyToFSDataOutputStream(FSDataOutputStream out, ByteBuffer buf) throws IOException {

synchronized (out) {
WritableByteChannel channel = Channels.newChannel(out);

Expand Down

0 comments on commit 1768bcc

Please sign in to comment.