Skip to content

Commit

Permalink
[SPARK-49673][CONNECT] Increase CONNECT_GRPC_ARROW_MAX_BATCH_SIZE to …
Browse files Browse the repository at this point in the history
…0.7 * CONNECT_GRPC_MAX_MESSAGE_SIZE

### What changes were proposed in this pull request?
Increases the default `maxBatchSize` from 4MiB * 0.7 to 128MiB (=
CONNECT_GRPC_MAX_MESSAGE_SIZE) * 0.7. This makes better use of the allowed maximum message size.
This limit is used when creating Arrow batches for the `SqlCommandResult` in the `SparkConnectPlanner` and for `ExecutePlanResponse.ArrowBatch` in `processAsArrowBatches`. This, for example, lets us return much larger `LocalRelations` in the `SqlCommandResult` (i.e., for the `SHOW PARTITIONS` command) while still staying within the GRPC message size limit.

### Why are the changes needed?
There are `SqlCommandResults` that exceed 0.7 * 4MiB.

### Does this PR introduce _any_ user-facing change?
Now support `SqlCommandResults` <= 0.7 * 128 MiB instead of only <= 0.7 * 4MiB and ExecutePlanResponses will now better use the limit of 128MiB.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48122 from dillitz/increase-sql-command-batch-size.

Authored-by: Robert Dillitz <robert.dillitz@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
dillitz authored and HyukjinKwon committed Sep 18, 2024
1 parent 25d6b7a commit 669e63a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Properties

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters._

import org.apache.commons.io.FileUtils
Expand All @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult}
import org.apache.spark.sql.connect.client.{RetryPolicy, SparkConnectClient, SparkResult}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession, SQLHelper}
Expand Down Expand Up @@ -1566,6 +1566,25 @@ class ClientE2ETestSuite
val result = df.select(trim(col("col"), " ").as("trimmed_col")).collect()
assert(result sameElements Array(Row("a"), Row("b"), Row("c")))
}

test("SPARK-49673: new batch size, multiple batches") {
val maxBatchSize = spark.conf.get("spark.connect.grpc.arrow.maxBatchSize").dropRight(1).toInt
// Adjust client grpcMaxMessageSize to maxBatchSize (10MiB; set in RemoteSparkSession config)
val sparkWithLowerMaxMessageSize = SparkSession
.builder()
.client(
SparkConnectClient
.builder()
.userId("test")
.port(port)
.grpcMaxMessageSize(maxBatchSize)
.retryPolicy(RetryPolicy
.defaultPolicy()
.copy(maxRetries = Some(10), maxBackoff = Some(FiniteDuration(30, "s"))))
.build())
.create()
assert(sparkWithLowerMaxMessageSize.range(maxBatchSize).collect().length == maxBatchSize)
}
}

private[sql] case class ClassData(a: String, b: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ object SparkConnectServerUtils {
// to make the tests exercise reattach.
"spark.connect.execute.reattachable.senderMaxStreamDuration=1s",
"spark.connect.execute.reattachable.senderMaxStreamSize=123",
// Testing SPARK-49673, setting maxBatchSize to 10MiB
s"spark.connect.grpc.arrow.maxBatchSize=${10 * 1024 * 1024}",
// Disable UI
"spark.ui.enabled=false")
Seq("--jars", catalystTestJar) ++ confs.flatMap(v => "--conf" :: v :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object Connect {
"conservatively use 70% of it because the size is not accurate but estimated.")
.version("3.4.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(4 * 1024 * 1024)
.createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)

val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE =
buildStaticConf("spark.connect.grpc.maxInboundMessageSize")
Expand Down

0 comments on commit 669e63a

Please sign in to comment.