diff --git a/datastore/integration_test.go b/datastore/integration_test.go index 6a01926449cc..3dfd082c67f0 100644 --- a/datastore/integration_test.go +++ b/datastore/integration_test.go @@ -722,7 +722,15 @@ func TestIntegration_AggregationQueries(t *testing.T) { for i := range keys { keys[i] = IncompleteKey("SQChild", parent) } - keys, err := client.PutMulti(ctx, keys, children) + + // Create transaction with read before creating entities + readTime := time.Now() + txBeforeCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(readTime)}...) + if err != nil { + t.Fatalf("client.NewTransaction: %v", err) + } + + keys, err = client.PutMulti(ctx, keys, children) if err != nil { t.Fatalf("client.PutMulti: %v", err) } @@ -733,13 +741,22 @@ func TestIntegration_AggregationQueries(t *testing.T) { } }() + // Create transaction with read after creating entities + readTime = time.Now() + txAfterCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(readTime)}...) + if err != nil { + t.Fatalf("client.NewTransaction: %v", err) + } + testCases := []struct { - desc string - aggQuery *AggregationQuery - wantFailure bool - wantErrMsg string - wantAggResult AggregationResult + desc string + aggQuery *AggregationQuery + transactionOpts []TransactionOption + wantFailure bool + wantErrMsg string + wantAggResult AggregationResult }{ + { desc: "Count Failure - Missing index", aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T>=", now). @@ -757,6 +774,34 @@ func TestIntegration_AggregationQueries(t *testing.T) { "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}}, }, }, + { + desc: "Aggregations in transaction before creating entities", + aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now). + Transaction(txBeforeCreate). + NewAggregationQuery(). + WithCount("count"). + WithSum("I", "sum"). + WithAvg("I", "avg"), + wantAggResult: map[string]interface{}{ + "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}}, + "sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}}, + "avg": &pb.Value{ValueType: &pb.Value_NullValue{NullValue: structpb.NullValue_NULL_VALUE}}, + }, + }, + { + desc: "Aggregations in transaction after creating entities", + aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now). + Transaction(txAfterCreate). + NewAggregationQuery(). + WithCount("count"). + WithSum("I", "sum"). + WithAvg("I", "avg"), + wantAggResult: map[string]interface{}{ + "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 8}}, + "sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 28}}, + "avg": &pb.Value{ValueType: &pb.Value_DoubleValue{DoubleValue: 3.5}}, + }, + }, { desc: "Multiple aggregations", aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now). diff --git a/datastore/query.go b/datastore/query.go index c9f9163c310d..e833ff8d8ff4 100644 --- a/datastore/query.go +++ b/datastore/query.go @@ -1026,7 +1026,6 @@ func DecodeCursor(s string) (Cursor, error) { // NewAggregationQuery returns an AggregationQuery with this query as its // base query. func (q *Query) NewAggregationQuery() *AggregationQuery { - q.eventual = true return &AggregationQuery{ query: q, aggregationQueries: make([]*pb.AggregationQuery_Aggregation, 0), diff --git a/datastore/query_test.go b/datastore/query_test.go index 22d13e1776ab..40fc8f327795 100644 --- a/datastore/query_test.go +++ b/datastore/query_test.go @@ -126,11 +126,6 @@ func fakeRunAggregationQuery(req *pb.RunAggregationQueryRequest) (*pb.RunAggrega }, }, }, - ReadOptions: &pb.ReadOptions{ - ConsistencyType: &pb.ReadOptions_ReadConsistency_{ - ReadConsistency: pb.ReadOptions_EVENTUAL, - }, - }, } if !proto.Equal(req, expectedIn) { return nil, fmt.Errorf("unsupported argument: got %v want %v", req, expectedIn)