Skip to content

Commit

Permalink
executor: clean exit in some operators (pingcap#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jan 4, 2022
1 parent f9ea138 commit ccb5f40
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion executor/runtime/benchmark/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,15 @@ type opReceive struct {

running bool
binlogClient pb.TestService_FeedBinlogClient

ctx context.Context
cancel context.CancelFunc
}

func (o *opReceive) NextWantedInputIdx() int { return runtime.DontNeedData }

func (o *opReceive) Close() error {
o.cancel()
return o.conn.Close()
}

Expand All @@ -125,6 +129,7 @@ func (o *opReceive) dial() (client pb.TestServiceClient, err error) {
}

func (o *opReceive) Prepare(_ *runtime.TaskContext) error {
o.ctx, o.cancel = context.WithCancel(context.Background())
return nil
}

Expand All @@ -135,7 +140,7 @@ func (o *opReceive) connect() error {
}
// start receiving data
// TODO: implement recover from a gtid point during failover.
o.binlogClient, err = client.FeedBinlog(context.Background(), &pb.TestBinlogRequest{Gtid: 0})
o.binlogClient, err = client.FeedBinlog(o.ctx, &pb.TestBinlogRequest{Gtid: 0})
if err != nil {
return errors.New("conn failed")
}
Expand Down Expand Up @@ -339,6 +344,7 @@ type opBinlog struct {

func (o *opBinlog) Close() error {
o.server.Stop()
close(o.binlogChan)
return nil
}

Expand Down

0 comments on commit ccb5f40

Please sign in to comment.