Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd: prevent ongoing requests being canceled by deadline exceeded #579

Merged
merged 1 commit into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -51,7 +52,7 @@ var (
captureID string
interval uint

defaultContextTimeoutDuration = 30 * time.Second
defaultContext context.Context
)

// cf holds changefeed id, which is used for output only
Expand Down Expand Up @@ -98,7 +99,7 @@ func newCliCommand() *cobra.Command {
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cliPdAddr},
DialTimeout: defaultContextTimeoutDuration,
DialTimeout: 30 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Expand Down Expand Up @@ -133,8 +134,7 @@ func newCliCommand() *cobra.Command {
if err != nil {
return errors.Annotate(err, "fail to open PD client")
}
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
err = util.CheckClusterVersion(ctx, pdCli, cliPdAddr)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions cmd/client_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ func newListCaptureCommand() *cobra.Command {
Use: "list",
Short: "List all captures in TiCDC cluster",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
captures, err := getAllCaptures(ctx)
if err != nil {
return err
Expand Down
43 changes: 10 additions & 33 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -41,8 +37,7 @@ func newAdminChangefeedCommand() []*cobra.Command {
Use: "pause",
Short: "Pause a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminStop,
Expand All @@ -54,8 +49,7 @@ func newAdminChangefeedCommand() []*cobra.Command {
Use: "resume",
Short: "Resume a paused replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminResume,
Expand All @@ -67,8 +61,7 @@ func newAdminChangefeedCommand() []*cobra.Command {
Use: "remove",
Short: "Remove a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminRemove,
Expand All @@ -90,8 +83,7 @@ func newListChangefeedCommand() *cobra.Command {
Use: "list",
Short: "List all replication tasks (changefeeds) in TiCDC cluster",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
_, raw, err := cdcEtcdCli.GetChangeFeeds(ctx)
if err != nil {
return err
Expand All @@ -111,8 +103,7 @@ func newQueryChangefeedCommand() *cobra.Command {
Use: "query",
Short: "Query information and status of a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
info, err := cdcEtcdCli.GetChangeFeedInfo(ctx, changefeedID)
if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists {
return err
Expand Down Expand Up @@ -152,8 +143,7 @@ func newCreateChangefeedCommand() *cobra.Command {
Short: "Create a new replication task (changefeed)",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
id := uuid.New().String()
if startTs == 0 {
ts, logical, err := pdCli.GetTS(ctx)
Expand Down Expand Up @@ -269,28 +259,15 @@ func newStatisticsChangefeedCommand() *cobra.Command {
Use: "statistics",
Short: "Periodically check and output the status of a replicaiton task (changefeed)",
RunE: func(cmd *cobra.Command, args []string) error {
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctx := defaultContext
tick := time.NewTicker(time.Duration(interval) * time.Second)
lastTime := time.Now()
var lastCount uint64
for {
select {
case sig := <-sc:
switch sig {
case syscall.SIGTERM:
cancel()
os.Exit(0)
default:
cancel()
os.Exit(1)
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return err
}
case <-tick.C:
now := time.Now()
Expand Down
3 changes: 1 addition & 2 deletions cmd/client_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ func newDeleteMetaCommand() *cobra.Command {
Use: "delete",
Short: "Delete all meta data in etcd, confirm that you know what this command will do and use it at your own risk",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
err := cdcEtcdCli.ClearAllCDCInfo(ctx)
if err == nil {
cmd.Println("already truncate all meta in etcd!")
Expand Down
6 changes: 2 additions & 4 deletions cmd/client_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ func newListProcessorCommand() *cobra.Command {
Use: "list",
Short: "List all processors in TiCDC cluster",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
info, err := cdcEtcdCli.GetProcessors(ctx)
if err != nil {
return err
Expand All @@ -41,8 +40,7 @@ func newQueryProcessorCommand() *cobra.Command {
Use: "query",
Short: "Query information and status of a sub replication task (processor)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
_, status, err := cdcEtcdCli.GetTaskStatus(ctx, changefeedID, captureID)
if err != nil && errors.Cause(err) != model.ErrTaskStatusNotExists {
return err
Expand Down
3 changes: 1 addition & 2 deletions cmd/client_tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ func newQueryTsoCommand() *cobra.Command {
Use: "query",
Short: "Get tso from PD",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := contextTimeout()
defer cancel()
ctx := defaultContext
ts, logic, err := pdCli.GetTS(ctx)
if err != nil {
return err
Expand Down
19 changes: 19 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -51,6 +54,22 @@ func initLog() error {

// Execute runs the root command
func Execute() {
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defaultContext = ctx
go func() {
sig := <-sc
log.Info("got signal to exit", zap.Stringer("signal", sig))
cancel()
}()

if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
20 changes: 1 addition & 19 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package cmd

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -60,22 +57,7 @@ func runEServer(cmd *cobra.Command, args []string) error {
if err != nil {
return errors.Annotate(err, "new server")
}

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := <-sc
log.Info("got signal to exit", zap.Stringer("signal", sig))
cancel()
}()

err = server.Run(ctx)
err = server.Run(defaultContext)
if err != nil && errors.Cause(err) != context.Canceled {
log.Error("run server", zap.String("error", errors.ErrorStack(err)))
return errors.Annotate(err, "run server")
Expand Down
4 changes: 0 additions & 4 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"
)

func contextTimeout() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultContextTimeoutDuration)
}

func getAllCaptures(ctx context.Context) ([]*capture, error) {
_, raw, err := cdcEtcdCli.GetCaptures(ctx)
if err != nil {
Expand Down