Skip to content

Commit

Permalink
Backport to 2.2: [YSQL][#6069] Enable copy from stdin to work with ro…
Browse files Browse the repository at this point in the history
…ws_per_transaction option.

Summary:
**Background**
In Postgres, there is a flag called `ybDataSent` in the `TransactionState` structure.
When output buffer is flushed, the flag gets set to true, marking that we cannot do a transparent restart anymore.

In `CopyFrom()` method, prior to processing, `ybDataSent` flag is used to verify if the current query is inside another transaction block (ie. nested transaction).
If so, an error will be thrown to disable `rows_per_transaction` option from being used.
This is to prevent previous transactions from getting committed while running batch commits.

**Problem**
Today, when COPY FROM query is sourced from STDIN, it runs `ReceiveCopyBegin()` function to flush the output buffer and ensure front-end knows it can send.
While flushing, `ybDataSent` flag also gets set to true even though it is not inside a nested transaction.

Eg.

```
create table t (a int);
copy t from stdin with (rows_per_transaction 10);
>> ERROR: ROWS_PER_TRANSACTION option is not supported in nested transaction

```
**Solution**
The flag should be explicitly turned off when it is not inside a nested transaction.
Before executing `ReceiveCopyBegin()` function, we should store the current state of `ybDataSent` to determine if it's inside a nested transaction.
After executing the flush,
  - keep the flag turned on to true if inside a transaction block
  - switch off flag to false if outside a transaction block.

Test Plan: Jenkins: rebase: 2.2

Reviewers: jason, mihnea

Reviewed By: mihnea

Subscribers: yql, zyu

Differential Revision: https://phabricator.dev.yugabyte.com/D9699
  • Loading branch information
emhna committed Oct 26, 2020
1 parent 6486b61 commit 5525951
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 2 deletions.
125 changes: 124 additions & 1 deletion java/yb-pgsql/src/test/java/org/yb/pgsql/TestBatchCopyFrom.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,22 @@

package org.yb.pgsql;

import static org.yb.AssertionWrappers.fail;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yb.client.TestUtils;
Expand Down Expand Up @@ -194,7 +202,7 @@ public void testTempTableWithBatchTransaction() throws Exception {
}

@Test
public void testBatchTransactionInNestedTransaction() throws Exception {
public void testInNestedTransaction() throws Exception {
String absFilePath = getAbsFilePath("batchSize5-copyfrom.txt");
String copyFromTableName = "batchedTable";
int totalLines = 20;
Expand All @@ -221,4 +229,119 @@ public void testBatchTransactionInNestedTransaction() throws Exception {
assertOneRow(statement, "SELECT COUNT(*) FROM " + dummyTableName, 0L);
}
}

@Test
public void testStdinCopy() throws Exception {
String absFilePath = getAbsFilePath("batch-copyfrom-stdin.txt");
String tableName = "stdinBatchSizeTable";
int totalLines = 5;
int batchSize = 1;

createFileInTmpDir(absFilePath, totalLines);

try (Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE TABLE %s (a int, b int, c int, d int)", tableName));
CopyManager copyManager = new CopyManager((BaseConnection) connection);
copyManager.copyIn(
String.format(
"COPY %s FROM STDIN WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %s)",
tableName, batchSize),
new BufferedReader(new FileReader(absFilePath))
);

assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, (long) totalLines);
}
}

@Test
public void tesStdinCopyInNestedTransactionWithPreviousTransaction() throws Exception {
String absFilePath = getAbsFilePath("batch-copyfrom-stdin-nested.txt");
String tableName = "stdinNestedBatchSizeTable";
String dummyTableName = "dummyTable";
int totalLines = 5;
int batchSize = 1;

createFileInTmpDir(absFilePath, totalLines);

try (Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE TABLE %s (a int, b int, c int, d int)", tableName));
statement.execute(String.format("CREATE TABLE %s (a int)", dummyTableName));
statement.execute("BEGIN TRANSACTION");
statement.execute(String.format("INSERT INTO %s (a) VALUES (1)", dummyTableName));
try {
CopyManager copyManager = new CopyManager((BaseConnection) connection);
copyManager.copyIn(
String.format(
"COPY %s FROM STDIN WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %s)",
tableName, batchSize),
new BufferedReader(new FileReader(absFilePath)));
fail(String.format("Statement did not fail: %s", INVALID_USAGE_ERROR_MSSG));
} catch (SQLException e) {
if (StringUtils.containsIgnoreCase(e.getMessage(), INVALID_USAGE_ERROR_MSSG)) {
LOG.info("Expected exception", e);
} else {
fail(String.format("Unexpected Error Message. Got: '%s', Expected to contain: '%s'",
e.getMessage(), INVALID_USAGE_ERROR_MSSG));
}
}
statement.execute("COMMIT TRANSACTION");

assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, 0L);
assertOneRow(statement, "SELECT COUNT(*) FROM " + dummyTableName, 0L);
}
}

@Test
public void tesStdinCopyInNestedTransactionWithProceedingTransaction() throws Exception {
String absFilePath = getAbsFilePath("batch-copyfrom-stdin-nested.txt");
String tableName = "stdinNestedBatchSizeTable";
String dummyTableName = "dummyTable";
int totalLines = 5;
int batchSize = 2;

createFileInTmpDir(absFilePath, totalLines);

try (Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE TABLE %s (a int, b int, c int, d int)", tableName));
statement.execute(String.format("CREATE TABLE %s (a int)", dummyTableName));
statement.execute("BEGIN TRANSACTION");
CopyManager copyManager = new CopyManager((BaseConnection) connection);
copyManager.copyIn(
String.format(
"COPY %s FROM STDIN WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %s)",
tableName, batchSize),
new BufferedReader(new FileReader(absFilePath))
);
statement.execute(String.format("INSERT INTO %s (a) VALUES (1)", dummyTableName));
statement.execute("COMMIT TRANSACTION");

assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, (long) totalLines);
assertOneRow(statement, "SELECT COUNT(*) FROM " + dummyTableName, 1L);
}
}

@Test
public void tesStdinCopyInNestedTransactionWithoutOtherTransaction() throws Exception {
String absFilePath = getAbsFilePath("batch-copyfrom-stdin.txt");
String tableName = "stdinNestedBatchSizeTable";
int totalLines = 5;
int batchSize = 1;

createFileInTmpDir(absFilePath, totalLines);

try (Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE TABLE %s (a int, b int, c int, d int)", tableName));
statement.execute("BEGIN TRANSACTION");
CopyManager copyManager = new CopyManager((BaseConnection) connection);
copyManager.copyIn(
String.format(
"COPY %s FROM STDIN WITH (FORMAT CSV, HEADER, ROWS_PER_TRANSACTION %s)",
tableName, batchSize),
new BufferedReader(new FileReader(absFilePath))
);
statement.execute("COMMIT TRANSACTION");

assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, (long) totalLines);
}
}
}
9 changes: 9 additions & 0 deletions src/postgres/src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,15 @@ void YBMarkDataSent(void)
s->ybDataSent = true;
}

/*
* Mark current transaction as having no data sent to the client.
*/
void YBMarkDataNotSent(void)
{
TransactionState s = CurrentTransactionState;
s->ybDataSent = false;
}

/*
* Whether some data has been transmitted to frontend as part of this transaction.
*/
Expand Down
14 changes: 13 additions & 1 deletion src/postgres/src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3309,9 +3309,21 @@ BeginCopyFrom(ParseState *pstate,
{
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)
{
bool isDataSent = YBIsDataSent();
ReceiveCopyBegin(cstate);
else
/*
* ReceiveCopyBegin sends a message back to the client
* with the expected format of the copy data.
* This implicitly causes YB data to be marked as sent
* although the message does not contain any data from YB.
* So we can safely roll back YBIsDataSent to its previous value.
*/
if (!isDataSent) YBMarkDataNotSent();
}
else {
cstate->copy_file = stdin;
}
}
else
{
Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/include/access/xact.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ extern void ExitParallelMode(void);
extern bool IsInParallelMode(void);

extern void YBMarkDataSent(void);
extern void YBMarkDataNotSent(void);
extern bool YBIsDataSent(void);

#endif /* XACT_H */

0 comments on commit 5525951

Please sign in to comment.