From bbe0d4edd114d22d5529425a22b2dc2bff5825af Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Aug 2023 16:25:16 +0800 Subject: [PATCH 1/3] [DNM] try to reproduce 46321 Signed-off-by: lance6716 --- .../backend/local/mockserver/import_server.go | 160 ++++++++++++++++++ .../backend/local/mockserver/main/main.go | 41 +++++ .../backend/local/region_job_test.go | 26 +++ 3 files changed, 227 insertions(+) create mode 100644 br/pkg/lightning/backend/local/mockserver/import_server.go create mode 100644 br/pkg/lightning/backend/local/mockserver/main/main.go diff --git a/br/pkg/lightning/backend/local/mockserver/import_server.go b/br/pkg/lightning/backend/local/mockserver/import_server.go new file mode 100644 index 0000000000000..37defe7955062 --- /dev/null +++ b/br/pkg/lightning/backend/local/mockserver/import_server.go @@ -0,0 +1,160 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mockserver + +import ( + "context" + "fmt" + "io" + + "github.com/pingcap/kvproto/pkg/import_sstpb" + "google.golang.org/grpc/metadata" +) + +type WriteServer struct{} + +func (w *WriteServer) SendAndClose(response *import_sstpb.WriteResponse) error { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) Recv() (*import_sstpb.WriteRequest, error) { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) SetHeader(md metadata.MD) error { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) SendHeader(md metadata.MD) error { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) SetTrailer(md metadata.MD) { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) Context() context.Context { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) SendMsg(m interface{}) error { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) RecvMsg(m interface{}) error { + //TODO implement me + panic("implement me") +} + +type MockImportSSTServer struct{} + +func (m *MockImportSSTServer) SwitchMode(ctx context.Context, request *import_sstpb.SwitchModeRequest) (*import_sstpb.SwitchModeResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) GetMode(ctx context.Context, request *import_sstpb.GetModeRequest) (*import_sstpb.GetModeResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Upload(server import_sstpb.ImportSST_UploadServer) error { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Ingest(ctx context.Context, request *import_sstpb.IngestRequest) (*import_sstpb.IngestResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Compact(ctx context.Context, request *import_sstpb.CompactRequest) (*import_sstpb.CompactResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) SetDownloadSpeedLimit(ctx context.Context, request *import_sstpb.SetDownloadSpeedLimitRequest) (*import_sstpb.SetDownloadSpeedLimitResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Download(ctx context.Context, request *import_sstpb.DownloadRequest) (*import_sstpb.DownloadResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Write(server import_sstpb.ImportSST_WriteServer) error { + fmt.Println("Write") + for { + request, err := server.Recv() + if err == io.EOF { + fmt.Println("EOF") + return nil + } + if err != nil { + fmt.Printf("err: %v\n", err) + return err + } + if meta := request.GetMeta(); meta != nil { + fmt.Printf("meta: %v\n", meta.String()) + } + if batch := request.GetBatch(); batch != nil { + fmt.Printf("chunk: %v\n", batch.String()) + } + err = server.SendAndClose(&import_sstpb.WriteResponse{ + Metas: []*import_sstpb.SSTMeta{ + { + Uuid: []byte("got"), + }, + }, + }) + if err != nil { + fmt.Printf("err: %v\n", err) + return err + } + } +} + +func (m *MockImportSSTServer) RawWrite(server import_sstpb.ImportSST_RawWriteServer) error { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) MultiIngest(ctx context.Context, request *import_sstpb.MultiIngestRequest) (*import_sstpb.IngestResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) DuplicateDetect(request *import_sstpb.DuplicateDetectRequest, server import_sstpb.ImportSST_DuplicateDetectServer) error { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Apply(ctx context.Context, request *import_sstpb.ApplyRequest) (*import_sstpb.ApplyResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) ClearFiles(ctx context.Context, request *import_sstpb.ClearRequest) (*import_sstpb.ClearResponse, error) { + //TODO implement me + panic("implement me") +} diff --git a/br/pkg/lightning/backend/local/mockserver/main/main.go b/br/pkg/lightning/backend/local/mockserver/main/main.go new file mode 100644 index 0000000000000..92d6ef5472ff8 --- /dev/null +++ b/br/pkg/lightning/backend/local/mockserver/main/main.go @@ -0,0 +1,41 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "log" + "net" + "os" + + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockserver" + "google.golang.org/grpc" +) + +func main() { + lis, err := net.Listen("tcp", ":0") + if err != nil { + log.Fatal(err) + } + fmt.Println(lis.Addr().String()) + fmt.Printf("PID: %d\n", os.Getpid()) + + server := grpc.NewServer() + import_sstpb.RegisterImportSSTServer(server, &mockserver.MockImportSSTServer{}) + if err := server.Serve(lis); err != nil { + log.Fatal(err) + } +} diff --git a/br/pkg/lightning/backend/local/region_job_test.go b/br/pkg/lightning/backend/local/region_job_test.go index fd508372ef9f9..a342e61893729 100644 --- a/br/pkg/lightning/backend/local/region_job_test.go +++ b/br/pkg/lightning/backend/local/region_job_test.go @@ -16,6 +16,7 @@ package local import ( "context" + "fmt" "sync" "testing" "time" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/stretchr/testify/require" + "google.golang.org/grpc" ) func TestIsIngestRetryable(t *testing.T) { @@ -257,3 +259,27 @@ func TestRegionJobRetryer(t *testing.T) { cancel() jobWg.Wait() } + +func TestWriteWhenServerIsGone(t *testing.T) { + //t.Skip("need mock server") + + ctx := context.Background() + conn, err := grpc.DialContext(ctx, "[::]:40401", grpc.WithInsecure()) + require.NoError(t, err) + for { + client := sst.NewImportSSTClient(conn) + stream, err := client.Write(ctx) + require.NoError(t, err) + err = stream.Send(&sst.WriteRequest{ + Chunk: &sst.WriteRequest_Meta{ + Meta: &sst.SSTMeta{ + Uuid: []byte("test"), + }, + }, + }) + require.NoError(t, err) + resp, err := stream.CloseAndRecv() + require.NoError(t, err) + fmt.Printf("resp: %v\n", resp) + } +} From 7e29a1b37b16bfb92a3e970282622db6cadba78a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Aug 2023 18:04:36 +0800 Subject: [PATCH 2/3] add client logic Signed-off-by: lance6716 --- .../backend/local/mockserver/main2/main.go | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 br/pkg/lightning/backend/local/mockserver/main2/main.go diff --git a/br/pkg/lightning/backend/local/mockserver/main2/main.go b/br/pkg/lightning/backend/local/mockserver/main2/main.go new file mode 100644 index 0000000000000..b9a0cf2d4324b --- /dev/null +++ b/br/pkg/lightning/backend/local/mockserver/main2/main.go @@ -0,0 +1,56 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + "log" + "os" + + sst "github.com/pingcap/kvproto/pkg/import_sstpb" + "google.golang.org/grpc" +) + +func main() { + ctx := context.Background() + + conn, err := grpc.DialContext(ctx, os.Args[1], grpc.WithInsecure()) + if err != nil { + log.Fatal(err) + } + for { + client := sst.NewImportSSTClient(conn) + stream, err := client.Write(ctx) + if err != nil { + log.Fatal(err) + } + err = stream.Send(&sst.WriteRequest{ + Chunk: &sst.WriteRequest_Meta{ + Meta: &sst.SSTMeta{ + Uuid: []byte("test"), + }, + }, + }) + if err != nil { + log.Fatal(err) + } + resp, err := stream.CloseAndRecv() + if err != nil { + log.Fatal(err) + } + fmt.Printf("resp: %v\n", resp) + } +} From 8f4ca49c760370d9552d4abca34f751cdf5bbb47 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 24 Aug 2023 10:55:18 +0800 Subject: [PATCH 3/3] update log Signed-off-by: lance6716 --- .../backend/local/mockserver/main/main.go | 3 ++- .../backend/local/mockserver/main2/main.go | 14 +++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/br/pkg/lightning/backend/local/mockserver/main/main.go b/br/pkg/lightning/backend/local/mockserver/main/main.go index 92d6ef5472ff8..47cb4628bc891 100644 --- a/br/pkg/lightning/backend/local/mockserver/main/main.go +++ b/br/pkg/lightning/backend/local/mockserver/main/main.go @@ -26,7 +26,8 @@ import ( ) func main() { - lis, err := net.Listen("tcp", ":0") + portStr := os.Args[1] + lis, err := net.Listen("tcp", ":"+portStr) if err != nil { log.Fatal(err) } diff --git a/br/pkg/lightning/backend/local/mockserver/main2/main.go b/br/pkg/lightning/backend/local/mockserver/main2/main.go index b9a0cf2d4324b..7306ba71a43c1 100644 --- a/br/pkg/lightning/backend/local/mockserver/main2/main.go +++ b/br/pkg/lightning/backend/local/mockserver/main2/main.go @@ -17,10 +17,11 @@ package main import ( "context" "fmt" - "log" "os" sst "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/log" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -29,13 +30,14 @@ func main() { conn, err := grpc.DialContext(ctx, os.Args[1], grpc.WithInsecure()) if err != nil { - log.Fatal(err) + log.Fatal("fail to dial", zap.Error(err)) } for { client := sst.NewImportSSTClient(conn) stream, err := client.Write(ctx) if err != nil { - log.Fatal(err) + log.Error("fail to write", zap.Error(err)) + continue } err = stream.Send(&sst.WriteRequest{ Chunk: &sst.WriteRequest_Meta{ @@ -45,11 +47,13 @@ func main() { }, }) if err != nil { - log.Fatal(err) + log.Error("fail to send", zap.Error(err)) + continue } resp, err := stream.CloseAndRecv() if err != nil { - log.Fatal(err) + log.Error("fail to close and recv", zap.Error(err)) + continue } fmt.Printf("resp: %v\n", resp) }