Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
gcs: compatible with old gcs bug (#677) (#688)
Browse files Browse the repository at this point in the history
* cherry pick #677 to release-4.0

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

* resolve conflict

* fix ci

* fix tiflash kill issue

Co-authored-by: 3pointer <luancheng@pingcap.com>
  • Loading branch information
ti-srebot and 3pointer authored Jan 11, 2021
1 parent f3cc9ad commit 5044b6f
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 213 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ bins:
@which bin/tiflash
@which bin/libtiflash_proxy.so
@which bin/cdc
@which bin/fake-gcs-server
if [ ! -d bin/flash_cluster_manager ]; then echo "flash_cluster_manager not exist"; exit 1; fi

tools:
Expand Down
2 changes: 2 additions & 0 deletions go.mod1
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ require (
google.golang.org/grpc v1.27.1
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
)

replace cloud.google.com/go/storage => github.com/3pointer/google-cloud-go/storage v1.6.1-0.20210108125931-b59bfa0720b2
203 changes: 2 additions & 201 deletions go.sum1

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/mock/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func NewCluster() (*Cluster, error) {

cluster := mocktikv.NewCluster()
client, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, nil, "")
mocktikv.BootstrapWithSingleStore(cluster)
if err != nil {
return nil, errors.Trace(err)
}
mocktikv.BootstrapWithSingleStore(cluster)
storage, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
if err != nil {
return nil, errors.Trace(err)
Expand Down
52 changes: 49 additions & 3 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ import (
"io"
"io/ioutil"
"path"
"strings"

"cloud.google.com/go/storage"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/spf13/pflag"
"go.uber.org/zap"
"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

berrors "github.com/pingcap/br/pkg/errors"
Expand Down Expand Up @@ -111,8 +115,15 @@ func (s *gcsStorage) Read(ctx context.Context, name string) ([]byte, error) {
}
defer rc.Close()

b := make([]byte, rc.Attrs.Size)
_, err = io.ReadFull(rc, b)
size := rc.Attrs.Size
var b []byte
if size < 0 {
// happened when using fake-gcs-server in integration test
b, err = ioutil.ReadAll(rc)
} else {
b = make([]byte, size)
_, err = io.ReadFull(rc, b)
}
return b, errors.Trace(err)
}

Expand Down Expand Up @@ -171,7 +182,9 @@ func newGCSStorage(ctx context.Context, gcs *backup.GCS, opts *ExternalStorageOp
"You should provide '--gcs.credentials_file' when '--send-credentials-to-tikv' is true")
}
}
clientOps = append(clientOps, option.WithCredentials(creds))
if creds != nil {
clientOps = append(clientOps, option.WithCredentials(creds))
}
} else {
clientOps = append(clientOps, option.WithCredentialsJSON([]byte(gcs.GetCredentialsBlob())))
}
Expand All @@ -193,6 +206,18 @@ func newGCSStorage(ctx context.Context, gcs *backup.GCS, opts *ExternalStorageOp
}

bucket := client.Bucket(gcs.Bucket)
// check whether it's a bug before #647, to solve case #2
// If the storage is set as gcs://bucket/prefix/,
// the backupmeta is written correctly to gcs://bucket/prefix/backupmeta,
// but the SSTs are written wrongly to gcs://bucket/prefix//*.sst (note the extra slash).
// see details about case 2 at https://github.com/pingcap/br/issues/675#issuecomment-753780742
sstInPrefix := hasSSTFiles(ctx, bucket, gcs.Prefix)
sstInPrefixSlash := hasSSTFiles(ctx, bucket, gcs.Prefix+"//")
if sstInPrefixSlash && !sstInPrefix {
// This is a old bug, but we must make it compatible.
// so we need find sst in slash directory
gcs.Prefix += "//"
}
if !opts.SkipCheckPath {
// check bucket exists
_, err = bucket.Attrs(ctx)
Expand All @@ -202,3 +227,24 @@ func newGCSStorage(ctx context.Context, gcs *backup.GCS, opts *ExternalStorageOp
}
return &gcsStorage{gcs: gcs, bucket: bucket}, nil
}

func hasSSTFiles(ctx context.Context, bucket *storage.BucketHandle, prefix string) bool {
query := storage.Query{Prefix: prefix}
_ = query.SetAttrSelection([]string{"Name"})
it := bucket.Objects(ctx, &query)
for {
attrs, err := it.Next()
if err == iterator.Done { // nolint:errorlint
break
}
if err != nil {
log.Warn("failed to list objects on gcs, will use default value for `prefix`", zap.Error(err))
break
}
if strings.HasSuffix(attrs.Name, ".sst") {
log.Info("sst file found in prefix slash", zap.String("file", attrs.Name))
return true
}
}
return false
}
20 changes: 14 additions & 6 deletions pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,26 @@ type BackendOptions struct {
GCS GCSBackendOptions `json:"gcs" toml:"gcs"`
}

// ParseRawURL parse raw url to url object.
func ParseRawURL(rawURL string) (*url.URL, error) {
// https://github.com/pingcap/br/issues/603
// In aws the secret key may contain '/+=' and '+' has a special meaning in URL.
// Replace "+" by "%2B" here to avoid this problem.
rawURL = strings.ReplaceAll(rawURL, "+", "%2B")
u, err := url.Parse(rawURL)
if err != nil {
return nil, errors.Trace(err)
}
return u, nil
}

// ParseBackend constructs a structured backend description from the
// storage URL.
func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBackend, error) {
if len(rawURL) == 0 {
return nil, errors.Annotate(berrors.ErrStorageInvalidConfig, "empty store is not allowed")
}

// https://github.com/pingcap/br/issues/603
// In aws the secret key may contain '/+=' and '+' has a special meaning in URL.
// Replace "+" by "%2B" here to avoid this problem.
rawURL = strings.ReplaceAll(rawURL, "+", "%2B")
u, err := url.Parse(rawURL)
u, err := ParseRawURL(rawURL)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
32 changes: 31 additions & 1 deletion pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"crypto/tls"
"fmt"
"net/url"
"path"
"strings"
"time"

gcs "cloud.google.com/go/storage"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
Expand Down Expand Up @@ -394,7 +396,26 @@ func ReadBackupMeta(
}
metaData, err := s.Read(ctx, fileName)
if err != nil {
return nil, nil, nil, errors.Annotate(err, "load backupmeta failed")
if gcsObjectNotFound(err) {
// change gcs://bucket/abc/def to gcs://bucket/abc and read defbackupmeta
oldPrefix := u.GetGcs().GetPrefix()
newPrefix, file := path.Split(oldPrefix)
newFileName := file + fileName
u.GetGcs().Prefix = newPrefix
s, err = storage.Create(ctx, u, cfg.SendCreds)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
log.Info("retry load metadata in gcs", zap.String("newPrefix", newPrefix), zap.String("newFileName", newFileName))
metaData, err = s.Read(ctx, newFileName)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
// reset prefix for tikv download sst file correctly.
u.GetGcs().Prefix = oldPrefix
} else {
return nil, nil, nil, errors.Annotate(err, "load backupmeta failed")
}
}
backupMeta := &backup.BackupMeta{}
if err = proto.Unmarshal(metaData, backupMeta); err != nil {
Expand Down Expand Up @@ -466,3 +487,12 @@ func normalizePDURL(pd string, useTLS bool) (string, error) {
}
return pd, nil
}

// check whether it's a bug before #647, to solve case #1
// If the storage is set as gcs://bucket/prefix,
// the SSTs are written correctly to gcs://bucket/prefix/*.sst
// but the backupmeta is written wrongly to gcs://bucket/prefixbackupmeta.
// see details https://github.com/pingcap/br/issues/675#issuecomment-753780742
func gcsObjectNotFound(err error) bool {
return errors.Cause(err) == gcs.ErrObjectNotExist // nolint:errorlint
}
2 changes: 1 addition & 1 deletion tests/_utils/run_services
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ cleanup_data() {
}

stop_services() {
for svc in "br" "tidb" "tiflash" "tikv" "pd"; do
for svc in "br" "tidb" "tiflash" "tikv" "pd" "TiFlashMain"; do
# tiflash and br do not have "-server"
killall -9 $svc-server &> /dev/null || true
killall -9 $svc &> /dev/null || true
Expand Down
9 changes: 9 additions & 0 deletions tests/br_gcs/oauth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env python3

from flask import Flask

app = Flask(__name__)

@app.route('/oauth/token', methods=['GET', 'POST'])
def oauth():
return '{"access_token": "ok", "token_type":"service_account", "expires_in":3600}'
144 changes: 144 additions & 0 deletions tests/br_gcs/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#!/bin/bash
#
# Copyright 2020 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux
DB="$TEST_NAME"
TABLE="usertable"
DB_COUNT=3

GCS_HOST="localhost"
GCS_PORT=21808
BUCKET="test"

# we need set public-host for download file, or it will return 404 when using client to read.
bin/fake-gcs-server -scheme http -host $GCS_HOST -port $GCS_PORT -backend memory -public-host $GCS_HOST:$GCS_PORT &
GCS_ID=$!
i=0
while ! curl -o /dev/null -v -s "http://$GCS_HOST:$GCS_PORT/"; do
i=$(($i+1))
if [ $i -gt 7 ]; then
echo 'Failed to start gcs-server'
exit 1
fi
sleep 2
done

# start oauth server
killall flask || true
FLASK_APP=tests/$TEST_NAME/oauth.py flask run &

stop_gcs() {
kill -2 $GCS_ID
killall flask || true
}
trap stop_gcs EXIT

rm -rf "$TEST_DIR/$DB"
mkdir -p "$TEST_DIR/$DB"

# start gcs-server
# Fill in the database
for i in $(seq $DB_COUNT); do
run_sql "CREATE DATABASE $DB${i};"
go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB${i}
done

# we need start a oauth server or gcs client will failed to handle request.
KEY=$(cat <<- EOF
{
"type": "service_account",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCT524vzG7uEVtX\nojcHbyQzVwlcaGkg1DWWLT+SufD08UYF0bsfcD0Etrtzo4ggwdxJQy5ygl3TNlcD\nKdelWbVyGfg9/sNB1RDlZYbQb0LVLHKjkVs7JyJsxrLk2e6NqD9ajwTEJUcLAQkj\nxlCcIi51beqrIRlvHjbtGwet/dNnRLSZf+i9SHvB2j64+RVYdnyf/IiLBvYyu7hF\nT6VjlljdbwC4TZ2jpfDL8nHRTiDiV+CX3/iH8MlMEOSM30AO5MPNVCZLlTA9W24a\nKi4NPBBlJLvG2mQELYdbhdM64iMvbPkDRtajJD6ogPB7wUoWbtSke5oOJNyV1HNt\nn91JH/dlAgMBAAECggEAQBwve2GSbfgxD0Xds4e9+dEO2jLZ6uSBS9TWOywFIa9Z\nqlkUUtbMZDgu/buTXJubeFg6EGGo+M4TnmfrNR2zFD/khj7hdS49kinVa5Dmt895\n66Osl3HprpvcXG2IxXd56q+Woc0Ew+TRiOPD+kGowLcB4ubIhw1iQpmWVRlyos6Q\nyvHssolrqOkRK9+1asixgow2Y15HtpXFN3XDIVj3gfdN1Zg80S66bTap1DS+dkJH\nSMgEZRilAjUGzbroqvZCiymlIJP5Jj5L5Wy8Qp/k1ixK10oaPgwvdmwXHX/DZ0vC\nT6XwpIaCYd3/XUWBHvrmQHFucWVPISZRi5WidggzuwKBgQDNHrxKaDrxcrV5Ncgu\npQrtQvTsIUCJGMo5m30X0Ac5CsIssOoQHdtEQW1ehJ8DtJRRb9rdWc4aelXsDUr+\no2m1zyZzM6S7IO2YhGDAo7Uu3fy1r33qYAt6uS/nHaJBpsKcyqqK+0wPDikdPLLx\nBBWZHF6WoswDEUVLQa/hHgpjPwKBgQC4l2/6xShNoobivzk8AE/Acq7PazA8gu4K\nY0UghTBlAst4RvBTURYZ2V3uw0S2FbfwL0/snHhNWZl5XjBX/H9oQmLri5qGOOpf\n9A11p5kd0x1mHDgTm/k7EgoskdXGB5NqXIB7l/3UI8Sk2N1PzHwyJJYfaB+EWTs8\n+LVy99VQWwKBgQCilRwVtiwSOSPSYWi8YCEbEpljmK+4eye/JZmviDpRYk+qcMf1\n4lRr85gm9OO9YiK1sf0+ufH9Vr5IDflFgG1HqFwHsAWANYdd/n9Z8eior1ehAurB\nHUO8EJEBlaGIfA+Bi7pF0w3kWQsJm5USKHSeGbh3ma4vOD8+eWBZBSCirQKBgQCe\n1uEq/sChnXtIXpgXg4Uc6xJ1tZy6VUgUdDulsjZklTUU+KYQa7QC5kKoFCtqK+It\nseiqiDIVDUa9Y0liTQotYwLQAT8kxJEZpF54oZFmUqX3mcy/QvYB2JIcrBkx4I7/\ndT2yHKX1CBpMZ7h41FMCquzrdaO5NTd+Td2FYrGSBQKBgEBnAerHh/NafYlVumlS\nVgouR9IketTegyEyntVyEvENx8OA5ZLMywCIKbPMFZgPR0RgDpyDxKauCU2E09e/\nboN76UOuOg11fknJh7vFbUbzM6BXvXVOTyX9ZtZBQcd5Y3tV+tYD1tHUgurGYWb+\nyHLBMOlXdpn0gZ4rwoIQgzD9\n-----END PRIVATE KEY-----\n",
"client_email": "test@email.com",
"token_uri": "http://localhost:5000/oauth/token"
}
EOF)
# save CREDENTIALS to file
echo $KEY > "tests/$TEST_NAME/config.json"
# export test CREDENTIALS for gcs oauth
export GOOGLE_APPLICATION_CREDENTIALS="tests/$TEST_NAME/config.json"
# create gcs bucket
curl -XPOST http://$GCS_HOST:$GCS_PORT/storage/v1/b -d '{"name":"test"}'
for i in $(seq $DB_COUNT); do
row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done
# new version backup full
echo "backup start..."
run_br --pd $PD_ADDR backup full -s "gcs://$BUCKET/$DB?endpoint=http://$GCS_HOST:$GCS_PORT/storage/v1/"
# old version backup full v4.0.8 and disable check-requirements
echo "v4.0.8 backup start..."
bin/brv4.0.8 --pd $PD_ADDR backup full -s "gcs://$BUCKET/${DB}_old?endpoint=http://$GCS_HOST:$GCS_PORT/storage/v1/" --check-requirements=false
# clean up
for i in $(seq $DB_COUNT); do
run_sql "DROP DATABASE $DB${i};"
done
# new version restore full
echo "restore start..."
run_br restore full -s "gcs://$BUCKET/$DB?" --pd $PD_ADDR --gcs.endpoint="http://$GCS_HOST:$GCS_PORT/storage/v1/"
for i in $(seq $DB_COUNT); do
row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done
fail=false
for i in $(seq $DB_COUNT); do
if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then
fail=true
echo "TEST: [$TEST_NAME] fail on database $DB${i}"
fi
echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}"
done
if $fail; then
echo "TEST: [$TEST_NAME] failed!"
exit 1
else
echo "TEST: [$TEST_NAME] new version successd!"
fi
# clean up
for i in $(seq $DB_COUNT); do
run_sql "DROP DATABASE $DB${i};"
done
echo "v4.0.8 version restore start..."
run_br restore full -s "gcs://$BUCKET/${DB}_old" --pd $PD_ADDR --gcs.endpoint="http://$GCS_HOST:$GCS_PORT/storage/v1/"
for i in $(seq $DB_COUNT); do
row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done
fail=false
for i in $(seq $DB_COUNT); do
if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then
fail=true
echo "TEST: [$TEST_NAME] fail on database $DB${i}"
fi
echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}"
done
if $fail; then
echo "TEST: [$TEST_NAME] failed!"
exit 1
else
echo "TEST: [$TEST_NAME] v4.0.8 version successd!"
fi
12 changes: 12 additions & 0 deletions tests/br_gcs/workload
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
recordcount=5000
operationcount=0
workload=core

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
12 changes: 12 additions & 0 deletions tests/download_tools.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ if [ ! -e "$BIN/minio" ]; then
chmod a+x "$BIN/minio"
fi

if [ ! -e "$BIN/fake-gcs-server" ]; then
echo "Downloading fake-gcs-server..."
curl -L -f -o "$BIN/fake-gcs-server" "http://lease.pingcap.org/fake-gcs-server"
chmod a+x "$BIN/fake-gcs-server"
fi

if [ ! -e "$BIN/brv4.0.8" ]; then
echo "Downloading brv4.0.8..."
curl -L -f -o "$BIN/brv4.0.8" "http://lease.pingcap.org/brv4.0.8"
chmod a+x "$BIN/brv4.0.8"
fi

if [ ! -e "$BIN/cdc" ]; then
echo "Downloading cdc..."
curl -L -f -o "$BIN/cdc.tar.gz" "https://download.pingcap.org/ticdc-nightly-linux-amd64.tar.gz"
Expand Down

0 comments on commit 5044b6f

Please sign in to comment.