From b15bafbe241fa7f7aa24503122f03ed329ac4b42 Mon Sep 17 00:00:00 2001 From: gunli Date: Tue, 31 Oct 2023 17:16:52 +0800 Subject: [PATCH] [INLONG-9174][SDK] Fail fast when work is unavailable in Golang SDK --- .../dataproxy-sdk-golang/dataproxy/client.go | 35 ++++++++----------- .../dataproxy-sdk-golang/dataproxy/worker.go | 2 +- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go index 77d64c7656f..f664ed27af7 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go @@ -22,7 +22,6 @@ import ( "errors" "math" "sync" - "time" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/discoverer" @@ -215,16 +214,16 @@ func (c *client) Dial(addr string) (gnet.Conn, error) { } func (c *client) Send(ctx context.Context, msg Message) error { - worker := c.getWorker() - if worker == nil { + worker, err := c.getWorker() + if err != nil { return ErrNoAvailableWorker } return worker.send(ctx, msg) } func (c *client) SendAsync(ctx context.Context, msg Message, cb Callback) { - worker := c.getWorker() - if worker == nil { + worker, err := c.getWorker() + if err != nil { if cb != nil { cb(msg, ErrNoAvailableWorker) } @@ -234,23 +233,17 @@ func (c *client) SendAsync(ctx context.Context, msg Message, cb Callback) { worker.sendAsync(ctx, msg, cb) } -func (c *client) getWorker() *worker { - for i := 0; i < c.options.WorkerNum; i++ { - index := c.curWorkerIndex.Load() - w := c.workers[index%uint64(len(c.workers))] - c.curWorkerIndex.Add(1) - - if w.available() { - return w - } else if i == c.options.WorkerNum-1 { - c.metrics.incError(errAllWorkerBusy.strCode) - return w - } else { - time.Sleep(1 * time.Millisecond) - continue - } +func (c *client) getWorker() (*worker, error) { + index := c.curWorkerIndex.Load() + w := c.workers[index%uint64(len(c.workers))] + c.curWorkerIndex.Add(1) + + if w.available() { + return w, nil } - return nil + + c.metrics.incError(workerBusy.strCode) + return nil, workerBusy } func (c *client) Close() { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index be3772c38db..ea58e5376a8 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -62,7 +62,7 @@ var ( errBadLog = &errNo{code: 10010, strCode: "10010", message: "input log is invalid"} errServerError = &errNo{code: 10011, strCode: "10011", message: "server error"} errServerPanic = &errNo{code: 10012, strCode: "10012", message: "server panic"} - errAllWorkerBusy = &errNo{code: 10013, strCode: "10013", message: "all workers are busy"} + workerBusy = &errNo{code: 10013, strCode: "10013", message: "worker is busy"} errNoMatchReq4Rsp = &errNo{code: 10014, strCode: "10014", message: "no match unacknowledged request for response"} errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message: "conn closed by peer"} errUnknown = &errNo{code: 20001, strCode: "20001", message: "unknown"}