Skip to content

Commit

Permalink
fix(c/driver/postgresql): chunk large COPY payloads
Browse files Browse the repository at this point in the history
PostgreSQL apparently has an internal limit - split up batches to
stay under that limit.  It doesn't care about message boundaries
in this mode, so we can chunk naively.

Fixes apache#1921.
  • Loading branch information
lidavidm committed Jun 25, 2024
1 parent c1ad8df commit 446d6bb
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
23 changes: 19 additions & 4 deletions c/driver/postgresql/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "statement.h"

#include <algorithm>
#include <array>
#include <cassert>
#include <cerrno>
Expand Down Expand Up @@ -571,6 +572,12 @@ struct BindStream {

AdbcStatusCode ExecuteCopy(PGconn* conn, int64_t* rows_affected,
struct AdbcError* error) {
// https://github.com/apache/arrow-adbc/issues/1921: PostgreSQL has a max
// size for a single message that we need to respect (1 GiB - 1). Since
// the buffer can be chunked up as much as we want, go for 512 MiB as our
// limit.
// https://github.com/postgres/postgres/blob/23c5a0e7d43bc925c6001538f04a458933a11fc1/src/common/stringinfo.c#L28
constexpr int64_t kMaxCopyBufferSize = 0x20000000;
if (rows_affected) *rows_affected = 0;

PostgresCopyStreamWriter writer;
Expand Down Expand Up @@ -606,10 +613,18 @@ struct BindStream {
}

ArrowBuffer buffer = writer.WriteBuffer();
if (PQputCopyData(conn, reinterpret_cast<char*>(buffer.data), buffer.size_bytes) <=
0) {
SetError(error, "Error writing tuple field data: %s", PQerrorMessage(conn));
return ADBC_STATUS_IO;
{
auto* data = reinterpret_cast<char*>(buffer.data);
int64_t remaining = buffer.size_bytes;
while (remaining > 0) {
int64_t to_write = std::min<int64_t>(remaining, kMaxCopyBufferSize);
if (PQputCopyData(conn, data, to_write) <= 0) {
SetError(error, "Error writing tuple field data: %s", PQerrorMessage(conn));
return ADBC_STATUS_IO;
}
remaining -= to_write;
data += to_write;
}
}

if (rows_affected) *rows_affected += array->length;
Expand Down
14 changes: 14 additions & 0 deletions python/adbc_driver_postgresql/tests/test_dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
# specific language governing permissions and limitations
# under the License.

import string
from pathlib import Path
from typing import Generator

import numpy
import pyarrow
import pyarrow.dataset
import pytest
Expand Down Expand Up @@ -410,3 +412,15 @@ def test_ingest_temporary(postgres: dbapi.Connection) -> None:
assert cur.fetch_arrow_table() == temp2
cur.execute("SELECT * FROM temporary")
assert cur.fetch_arrow_table() == temp2


def test_ingest_large(postgres: dbapi.Connection) -> None:
"""Regression test for #1921."""
# More than 1 GiB of data in one batch
arr = pyarrow.array(numpy.random.randint(-100, 100, size=4_000_000))
batch = pyarrow.RecordBatch.from_pydict(
{char: arr for char in string.ascii_lowercase}
)
table = pyarrow.Table.from_batches([batch] * 4)
with postgres.cursor() as cur:
cur.adbc_ingest("test_ingest_large", table, mode="replace")

0 comments on commit 446d6bb

Please sign in to comment.