From 446d6bb3fde9af88c9e566b19934fedec9f19f57 Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 25 Jun 2024 01:57:27 -0400
Subject: [PATCH] fix(c/driver/postgresql): chunk large COPY payloads
PostgreSQL apparently has an internal limit - split up batches to
stay under that limit. It doesn't care about message boundaries
in this mode, so we can chunk naively.
Fixes #1921.
---
c/driver/postgresql/statement.cc | 23 +++++++++++++++----
.../tests/test_dbapi.py | 14 +++++++++++
2 files changed, 33 insertions(+), 4 deletions(-)
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 93e88ee82c..ac792e21ed 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -20,6 +20,7 @@
#include "statement.h"
+#include
#include
#include
#include
@@ -571,6 +572,12 @@ struct BindStream {
AdbcStatusCode ExecuteCopy(PGconn* conn, int64_t* rows_affected,
struct AdbcError* error) {
+ // https://github.com/apache/arrow-adbc/issues/1921: PostgreSQL has a max
+ // size for a single message that we need to respect (1 GiB - 1). Since
+ // the buffer can be chunked up as much as we want, go for 512 MiB as our
+ // limit.
+ // https://github.com/postgres/postgres/blob/23c5a0e7d43bc925c6001538f04a458933a11fc1/src/common/stringinfo.c#L28
+ constexpr int64_t kMaxCopyBufferSize = 0x20000000;
if (rows_affected) *rows_affected = 0;
PostgresCopyStreamWriter writer;
@@ -606,10 +613,18 @@ struct BindStream {
}
ArrowBuffer buffer = writer.WriteBuffer();
- if (PQputCopyData(conn, reinterpret_cast(buffer.data), buffer.size_bytes) <=
- 0) {
- SetError(error, "Error writing tuple field data: %s", PQerrorMessage(conn));
- return ADBC_STATUS_IO;
+ {
+ auto* data = reinterpret_cast(buffer.data);
+ int64_t remaining = buffer.size_bytes;
+ while (remaining > 0) {
+ int64_t to_write = std::min(remaining, kMaxCopyBufferSize);
+ if (PQputCopyData(conn, data, to_write) <= 0) {
+ SetError(error, "Error writing tuple field data: %s", PQerrorMessage(conn));
+ return ADBC_STATUS_IO;
+ }
+ remaining -= to_write;
+ data += to_write;
+ }
}
if (rows_affected) *rows_affected += array->length;
diff --git a/python/adbc_driver_postgresql/tests/test_dbapi.py b/python/adbc_driver_postgresql/tests/test_dbapi.py
index 283e3fe687..94cd9f82d0 100644
--- a/python/adbc_driver_postgresql/tests/test_dbapi.py
+++ b/python/adbc_driver_postgresql/tests/test_dbapi.py
@@ -15,9 +15,11 @@
# specific language governing permissions and limitations
# under the License.
+import string
from pathlib import Path
from typing import Generator
+import numpy
import pyarrow
import pyarrow.dataset
import pytest
@@ -410,3 +412,15 @@ def test_ingest_temporary(postgres: dbapi.Connection) -> None:
assert cur.fetch_arrow_table() == temp2
cur.execute("SELECT * FROM temporary")
assert cur.fetch_arrow_table() == temp2
+
+
+def test_ingest_large(postgres: dbapi.Connection) -> None:
+ """Regression test for #1921."""
+ # More than 1 GiB of data in one batch
+ arr = pyarrow.array(numpy.random.randint(-100, 100, size=4_000_000))
+ batch = pyarrow.RecordBatch.from_pydict(
+ {char: arr for char in string.ascii_lowercase}
+ )
+ table = pyarrow.Table.from_batches([batch] * 4)
+ with postgres.cursor() as cur:
+ cur.adbc_ingest("test_ingest_large", table, mode="replace")