Skip to content

Commit

Permalink
[INLONG-9174][SDK] Fail fast when work is unavailable in Golang SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Oct 31, 2023
1 parent f055e00 commit b15bafb
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"math"
"sync"
"time"

"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/discoverer"
Expand Down Expand Up @@ -215,16 +214,16 @@ func (c *client) Dial(addr string) (gnet.Conn, error) {
}

func (c *client) Send(ctx context.Context, msg Message) error {
worker := c.getWorker()
if worker == nil {
worker, err := c.getWorker()
if err != nil {
return ErrNoAvailableWorker
}
return worker.send(ctx, msg)
}

func (c *client) SendAsync(ctx context.Context, msg Message, cb Callback) {
worker := c.getWorker()
if worker == nil {
worker, err := c.getWorker()
if err != nil {
if cb != nil {
cb(msg, ErrNoAvailableWorker)
}
Expand All @@ -234,23 +233,17 @@ func (c *client) SendAsync(ctx context.Context, msg Message, cb Callback) {
worker.sendAsync(ctx, msg, cb)
}

func (c *client) getWorker() *worker {
for i := 0; i < c.options.WorkerNum; i++ {
index := c.curWorkerIndex.Load()
w := c.workers[index%uint64(len(c.workers))]
c.curWorkerIndex.Add(1)

if w.available() {
return w
} else if i == c.options.WorkerNum-1 {
c.metrics.incError(errAllWorkerBusy.strCode)
return w
} else {
time.Sleep(1 * time.Millisecond)
continue
}
func (c *client) getWorker() (*worker, error) {
index := c.curWorkerIndex.Load()
w := c.workers[index%uint64(len(c.workers))]
c.curWorkerIndex.Add(1)

if w.available() {
return w, nil
}
return nil

c.metrics.incError(workerBusy.strCode)
return nil, workerBusy
}

func (c *client) Close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
errBadLog = &errNo{code: 10010, strCode: "10010", message: "input log is invalid"}
errServerError = &errNo{code: 10011, strCode: "10011", message: "server error"}
errServerPanic = &errNo{code: 10012, strCode: "10012", message: "server panic"}
errAllWorkerBusy = &errNo{code: 10013, strCode: "10013", message: "all workers are busy"}
workerBusy = &errNo{code: 10013, strCode: "10013", message: "worker is busy"}
errNoMatchReq4Rsp = &errNo{code: 10014, strCode: "10014", message: "no match unacknowledged request for response"}
errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message: "conn closed by peer"}
errUnknown = &errNo{code: 20001, strCode: "20001", message: "unknown"}
Expand Down

0 comments on commit b15bafb

Please sign in to comment.