Skip to content

Commit

Permalink
[Gluten-995] Fix memory leak for ClickHouse Backend
Browse files Browse the repository at this point in the history
Fix memory leak for ClickHouse Backend.

There are memory leak after executing BHJ.

Close apache#995.
  • Loading branch information
zzcclp committed Feb 21, 2023
1 parent 2b12a41 commit 26420f5
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public class OnHeapCopyShuffleInputStream implements ShuffleInputStream {

private final InputStream in;
private InputStream in;
private final boolean isCompressed;
private int bufferSize;
private long bytesRead = 0L;
Expand Down Expand Up @@ -78,6 +78,7 @@ public boolean isCompressed() {
public void close() {
try {
in.close();
in = null;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;

public class StorageJoinBuilder {
public class StorageJoinBuilder implements AutoCloseable {
private ShuffleInputStream in;

private int customizeBufferSize;
Expand Down Expand Up @@ -107,4 +107,13 @@ public void build() {
join,
structure);
}

@Override
public void close() throws Exception {
try {
in.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ case class ClickHouseBuildSideRelation(
)
// Build the hash table
storageJoinBuilder.build()
storageJoinBuilder.close()
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ object CHAggAndShuffleBenchmark extends SqlBasedBenchmark {
.setIfMissing("spark.executor.memory", memorySize)
.setIfMissing("spark.sql.files.maxPartitionBytes", "1G")
.setIfMissing("spark.sql.files.openCostInBytes", "1073741824")
.setIfMissing("spark.gluten.sql.columnar.coalesce.batches", "true")
.setIfMissing("spark.gluten.sql.columnar.coalesce.batches", "false")
.setIfMissing("spark.shuffle.manager", "sort")
.setIfMissing("spark.io.compression.codec", "SNAPPY")

Expand Down

0 comments on commit 26420f5

Please sign in to comment.