-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch DML support #107
Batch DML support #107
Conversation
…-r2dbc into batch-dml # Conflicts: # src/main/java/com/google/cloud/spanner/r2dbc/SpannerStatement.java # src/test/java/com/google/cloud/spanner/r2dbc/it/SpannerIT.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, just need to switch to a unary call (good to know that it works with streaming; we've built resilient conversion between grpc and publishers, I guess).
PartialResultRowExtractor partialResultRowExtractor = new PartialResultRowExtractor(); | ||
|
||
Flux<PartialResultSet> resultSetFlux = | ||
this.client.executeStreamingSql( | ||
this.session, this.transaction, this.sql, params, this.statementBindings.getTypes()); | ||
|
||
if (statementType == StatementType.SELECT) { | ||
if (this.statementType == StatementType.SELECT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this check anymore? DML does not go through this method anymore, and I don't imagine DDL will either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 and should probably rename to runSelectStatement
} | ||
|
||
return ObservableReactiveUtil | ||
.streamingCall(obs -> this.spanner.executeBatchDml(request.build(), obs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really a streaming call; use unaryCall()
here.
@@ -268,12 +292,13 @@ public void testEmptySelect() { | |||
/** | |||
* Executes a DML query and returns the rows updated. | |||
*/ | |||
private int executeDmlQuery(String sql) { | |||
private List<Integer> executeDmlQuery(String sql) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be kept as before, returning integer? B/c String sql
can only correspond to one statement.
|
||
if (statementType == StatementType.SELECT) { | ||
return structFlux.flatMap(struct -> runSingleStatement(struct, statementType)); | ||
if (this.statementType == StatementType.DML) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you should just have a class hierarchy of statements -- SpannerDmlStatement
, SpannerDdlStatement
, SpannerReadStatement
. WDYT?
} | ||
// DML statements have to be executed sequentially because they need seqNo to be in order | ||
return structFlux.concatMapDelayError(struct -> runSingleStatement(struct, statementType)); | ||
Flux<Struct> structFlux = Flux.fromIterable(this.statementBindings.getBindings()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Else if SELECT
? Are you missing the DDL case?
The whole thing cloud be a switch statement, with the default throwing an exception.
Alternatively, see my suggestion about the class hierarchy.
PartialResultRowExtractor partialResultRowExtractor = new PartialResultRowExtractor(); | ||
|
||
Flux<PartialResultSet> resultSetFlux = | ||
this.client.executeStreamingSql( | ||
this.session, this.transaction, this.sql, params, this.statementBindings.getTypes()); | ||
|
||
if (statementType == StatementType.SELECT) { | ||
if (this.statementType == StatementType.SELECT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 and should probably rename to runSelectStatement
Connection connection = Mono.from(connectionFactory.create()).block(); | ||
|
||
Mono.from(connection.beginTransaction()).block(); | ||
int rowsUpdated = Mono.from(connection.createStatement(sql).execute()) | ||
List<Integer> rowsUpdated = Flux.from(connection.createStatement(sql).execute()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/rowsUpdated/rowsUpdatedPerStatement
fixes #92