Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spanner: Race condition in (*BatchReadOnlyTransaction).Execute() #1895

Closed
lhecker opened this issue Apr 1, 2020 · 1 comment
Closed

spanner: Race condition in (*BatchReadOnlyTransaction).Execute() #1895

lhecker opened this issue Apr 1, 2020 · 1 comment
Assignees
Labels
api: spanner Issues related to the Spanner API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@lhecker
Copy link
Contributor

lhecker commented Apr 1, 2020

Client

Spanner

Environment

Any

Code

The following code counts the total number of rows using PartitionRead().
It'll exhibit the race condition by showing varying total counts at the end of each run.
Of course this only works if your table has more than one partition.

package main

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/spanner"
)

func main() {
	client, err := spanner.NewClient(context.Background(), "projects/<project>/instances/<instance>/databases/<database>")
	if err != nil {
		fmt.Println(err)
		return
	}

	txn, err := client.BatchReadOnlyTransaction(context.Background(), spanner.StrongRead())
	if err != nil {
		fmt.Println(err)
		return
	}
	defer txn.Cleanup(context.Background())

	partitions, err := txn.PartitionRead(context.Background(), "<table>", spanner.AllKeys(), []string{"<column>"}, spanner.PartitionOptions{})
	if err != nil {
		fmt.Println(err)
		return
	}

	wg := &sync.WaitGroup{}
	var total uint64

	for _, p := range partitions {
		p := p

		go func() {
			iter := txn.Execute(context.Background(), p)
			defer iter.Stop()

			var count uint64
			err = iter.Do(func(row *spanner.Row) error {
				count++
				return nil
			})
			if err != nil {
				return
			}

			atomic.AddUint64(&total, count)
		}()
	}

	wg.Wait()
	fmt.Println(total)
}

Expected behavior

PartitionRead() reads all rows.

Actual behavior

A race condition. 😢

Cause

Each partition points to the same rreq/qreq.
For instance for rreq line 137 in the following code section:

// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
}
// Generate partitions.
for _, p := range resp.GetPartitions() {
partitions = append(partitions, &Partition{
pt: p.PartitionToken,
rreq: req,
})
}

During the call in Execution() the rreq/qreq fields are then mutated without locks of any kind.
For instance for rreq line 264 in the following code section:

// Read or query partition.
if p.rreq != nil {
p.rreq.PartitionToken = p.pt
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
p.rreq.ResumeToken = resumeToken
return client.StreamingRead(ctx, p.rreq)
}
} else {
p.qreq.PartitionToken = p.pt
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
p.qreq.ResumeToken = resumeToken
return client.ExecuteStreamingSql(ctx, p.qreq)
}
}

The documentation of that function also doesn't mention it's not safe for concurrent use, nor that it mutates it's argument.

I suppose the struct field pt exists because this was an attempt to reduce the data size after a call to MarshalBinary()?
In that case I'd like to kindly suggest to simply copy the rreq/qreq fields within the Execution() method and mutate the copy only. 🙂

Additional context

Intermediate fix:

data, err := p.MarshalBinary()
if err != nil {
	return
}

p := &spanner.Partition{}
err = p.UnmarshalBinary(data)
if err != nil {
	return
}

👉 Should I submit a fix?
I already did so in the past, but last time you told me to open an issue beforehand. 😄

@lhecker lhecker added the triage me I really want to be triaged. label Apr 1, 2020
@codyoss codyoss added api: spanner Issues related to the Spanner API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. and removed triage me I really want to be triaged. labels Apr 1, 2020
@olavloite olavloite self-assigned this Apr 2, 2020
@olavloite
Copy link
Contributor

@lhecker Thanks, that's a great catch. And thank you for the very elaborate report and analysis.

There's a fix coming here: https://code-review.googlesource.com/c/gocloud/+/54190

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: spanner Issues related to the Spanner API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

3 participants