-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Statement parameter binding #87
Changes from 9 commits
c5a4315
cb64255
799b719
30eb940
b09ff89
4695d86
90881d8
145b282
3baaaa0
c8db22d
d890864
5e57ba0
74ade6a
d551ba9
32a429d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,13 +16,26 @@ | |
|
||
package com.google.cloud.spanner.r2dbc; | ||
|
||
import static java.util.Objects.requireNonNull; | ||
|
||
import com.google.cloud.spanner.r2dbc.client.Client; | ||
import com.google.cloud.spanner.r2dbc.codecs.Codec; | ||
import com.google.cloud.spanner.r2dbc.codecs.Codecs; | ||
import com.google.cloud.spanner.r2dbc.codecs.DefaultCodecs; | ||
import com.google.cloud.spanner.r2dbc.result.PartialResultRowExtractor; | ||
import com.google.protobuf.Struct; | ||
import com.google.protobuf.Struct.Builder; | ||
import com.google.spanner.v1.PartialResultSet; | ||
import com.google.spanner.v1.Session; | ||
import com.google.spanner.v1.Transaction; | ||
import com.google.spanner.v1.Type; | ||
import io.r2dbc.spi.Result; | ||
import io.r2dbc.spi.Statement; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import javax.annotation.Nullable; | ||
import org.reactivestreams.Publisher; | ||
import reactor.core.publisher.Flux; | ||
|
@@ -37,16 +50,23 @@ public class SpannerStatement implements Statement { | |
|
||
private Session session; | ||
|
||
private Transaction transaction; | ||
private SpannerTransactionContext transaction; | ||
|
||
private String sql; | ||
|
||
private LinkedList<Map<String, Object>> bindings = new LinkedList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in practice ArrayList is typically preferred over LinkedList because of caching advantages. |
||
|
||
private Codecs codecs = new DefaultCodecs(); | ||
|
||
private Map<String, Type> types = Collections.EMPTY_MAP; | ||
|
||
private Map<String, Codec> resolvedCodecs = new HashMap<>(); | ||
|
||
/** | ||
* Creates a Spanner statement for a given SQL statement. | ||
* | ||
* <p>If no transaction is present, a temporary strongly consistent readonly transaction will be | ||
* used. | ||
* | ||
* @param client cloud spanner client to use for performing the query operation | ||
* @param session current cloud spanner session | ||
* @param transaction current cloud spanner transaction, or empty if no transaction is started | ||
|
@@ -55,45 +75,88 @@ public class SpannerStatement implements Statement { | |
public SpannerStatement( | ||
Client client, | ||
Session session, | ||
@Nullable Transaction transaction, | ||
@Nullable SpannerTransactionContext transaction, | ||
String sql) { | ||
|
||
this.client = client; | ||
this.session = session; | ||
this.transaction = transaction; | ||
this.sql = sql; | ||
add(); | ||
} | ||
|
||
@Override | ||
public Statement add() { | ||
return null; | ||
this.bindings.add(new HashMap<>()); | ||
return this; | ||
} | ||
|
||
@Override | ||
public Statement bind(Object o, Object o1) { | ||
return null; | ||
public Statement bind(Object identifier, Object value) { | ||
requireNonNull(identifier); | ||
if (identifier instanceof String) { | ||
this.bindings.getLast().put((String)identifier, value); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LinkedList.getLast() is efficient, but if we keep a map of in-progress/incomplete bindings separately, then there is no need for getting it, and also no need to add an empty list in constructor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess the only thing here would be that after completing the last one, you'd have to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea I think that might complicate things. Especially since the user doesn't need to call add() to close/finalize the final one. That would mean some sort of code in the execute() that would add on this final map, which feels out of place to be in in the execute() method. |
||
return this; | ||
} | ||
throw new IllegalArgumentException("Only String identifiers are supported"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could make this into a helper in the |
||
} | ||
|
||
@Override | ||
public Statement bind(int i, Object o) { | ||
return null; | ||
throw new IllegalArgumentException("Only named parameters are supported"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have an issue to track support for index based binding? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought we decided that we don't want to support that, because it would require parsing of the SQL string. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Either way, let's track this as an issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
|
||
@Override | ||
public Statement bindNull(Object o, Class<?> type) { | ||
return null; | ||
public Statement bindNull(Object identifier, Class<?> type) { | ||
return bind(identifier, null); | ||
} | ||
|
||
@Override | ||
public Statement bindNull(int i, Class<?> type) { | ||
return null; | ||
throw new IllegalArgumentException("Only named parameters are supported"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto |
||
} | ||
|
||
@Override | ||
public Publisher<? extends Result> execute() { | ||
List<Struct> paramsStructs = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there anything wrong with building the |
||
for (Map<String, Object> bindingsBatch : this.bindings) { | ||
Builder paramsStructBuilder = Struct.newBuilder(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor style thing - it would be better to qualify this |
||
Map<String, Type> types = this.types.isEmpty() ? new HashMap<>() : null; | ||
|
||
for (Map.Entry<String, Object> binding : bindingsBatch.entrySet()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to put the logic in a helper method and call it from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I would recommend this approach too. You could keep a |
||
String paramName = binding.getKey(); | ||
Codec codec = this.resolvedCodecs.computeIfAbsent(paramName, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same binding for all rows should be of the same type, right? Can we save time by only doing this on the first There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we know that all rows will use the same type for a particular column? Is that the contract in R2DBC? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These aren't actually columns if i'm not mistaken. These are param tags in the query string, which means they don't need to be the same type. For example , you can get the size in bytes in googleSQL of a STRING or BYTES, so that single tag could take on either value. Furthermore, I think the codec can change too? For example, if the tag in the SQL is like |
||
name -> this.codecs.getCodec(binding.getValue())); | ||
paramsStructBuilder | ||
.putFields(paramName, codec.encode(binding.getValue())); | ||
|
||
if (this.types.isEmpty()) { | ||
types.put(paramName, | ||
Type.newBuilder().setCode(codec.getTypeCode()).build()); | ||
} | ||
|
||
} | ||
|
||
if (types != null) { | ||
this.types = types; | ||
} | ||
|
||
paramsStructs.add(paramsStructBuilder.build()); | ||
} | ||
|
||
Flux<Struct> structFlux = Flux.fromIterable(paramsStructs); | ||
|
||
if (this.sql != null && this.sql.trim().toLowerCase().startsWith("select")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add assertion for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know it's trivial logic, but can the second half of the condition go into a |
||
return structFlux.flatMap(this::runSingleStatement); | ||
} | ||
return structFlux.concatMapDelayError(this::runSingleStatement); | ||
elefeint marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
private Mono<? extends Result> runSingleStatement(Struct params) { | ||
PartialResultRowExtractor partialResultRowExtractor = new PartialResultRowExtractor(); | ||
|
||
return this.client.executeStreamingSql(this.session, this.transaction, this.sql) | ||
return this.client | ||
.executeStreamingSql(this.session, this.transaction, this.sql, params, this.types) | ||
.switchOnFirst((signal, flux) -> { | ||
if (signal.hasError()) { | ||
return Mono.error(signal.getThrowable()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright 2019 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.spanner.r2dbc; | ||
|
||
import com.google.spanner.v1.Transaction; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import javax.annotation.Nullable; | ||
|
||
/** | ||
* A class to hold transaction-related data. | ||
*/ | ||
public class SpannerTransactionContext { | ||
|
||
private Transaction transaction; | ||
|
||
private AtomicLong seqNum = new AtomicLong(0); | ||
|
||
private SpannerTransactionContext(Transaction transaction) { | ||
this.transaction = transaction; | ||
} | ||
|
||
public Transaction getTransaction() { | ||
return this.transaction; | ||
} | ||
|
||
public long nextSeqNum() { | ||
return this.seqNum.getAndIncrement(); | ||
} | ||
|
||
/** | ||
* Creates the SpannerTransactionContext. | ||
* @param transaction spanner transaction | ||
* @return spanner transaction context | ||
*/ | ||
public static @Nullable SpannerTransactionContext from(Transaction transaction) { | ||
if (transaction == null) { | ||
return null; | ||
} | ||
return new SpannerTransactionContext(transaction); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,9 +17,11 @@ | |
package com.google.cloud.spanner.r2dbc.client; | ||
|
||
import com.google.auth.oauth2.GoogleCredentials; | ||
import com.google.cloud.spanner.r2dbc.SpannerTransactionContext; | ||
import com.google.cloud.spanner.r2dbc.util.ObservableReactiveUtil; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.protobuf.Empty; | ||
import com.google.protobuf.Struct; | ||
import com.google.spanner.v1.BeginTransactionRequest; | ||
import com.google.spanner.v1.CommitRequest; | ||
import com.google.spanner.v1.CommitResponse; | ||
|
@@ -35,10 +37,12 @@ | |
import com.google.spanner.v1.TransactionOptions; | ||
import com.google.spanner.v1.TransactionOptions.ReadWrite; | ||
import com.google.spanner.v1.TransactionSelector; | ||
import com.google.spanner.v1.Type; | ||
import io.grpc.CallCredentials; | ||
import io.grpc.ManagedChannel; | ||
import io.grpc.ManagedChannelBuilder; | ||
import io.grpc.auth.MoreCallCredentials; | ||
import java.util.Map; | ||
import javax.annotation.Nullable; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
|
@@ -155,20 +159,27 @@ public Mono<Void> deleteSession(Session session) { | |
}); | ||
} | ||
|
||
// TODO: add information about parameters being added to signature | ||
@Override | ||
public Flux<PartialResultSet> executeStreamingSql( | ||
Session session, @Nullable Transaction transaction, String sql) { | ||
Session session, @Nullable SpannerTransactionContext transactionContext, String sql, | ||
Struct params, Map<String, Type> types) { | ||
|
||
return Flux.defer(() -> { | ||
ExecuteSqlRequest.Builder executeSqlRequest = | ||
ExecuteSqlRequest.newBuilder() | ||
.setSql(sql) | ||
.setSession(session.getName()); | ||
if (params != null) { | ||
executeSqlRequest | ||
.setParams(params) | ||
.putAllParamTypes(types); | ||
} | ||
|
||
if (transaction != null) { | ||
if (transactionContext != null && transactionContext.getTransaction() != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you should just add a |
||
executeSqlRequest.setTransaction( | ||
TransactionSelector.newBuilder().setId(transaction.getId()).build()); | ||
TransactionSelector.newBuilder().setId(transactionContext.getTransaction().getId()) | ||
.build()); | ||
executeSqlRequest.setSeqno(transactionContext.nextSeqNum()); | ||
} | ||
|
||
return ObservableReactiveUtil.streamingCall( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do
SpannerTransactionContext.from
here rather than in theSpannerStatement
?Or, why not replace the
currentTransaction
inSpannerConnection
withSpannerTransactionContext
?