Skip to content

Commit

Permalink
[INLONG-9180][SDK] Cache up batchReq.dataReqs Golang SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Oct 31, 2023
1 parent f055e00 commit 2b3956d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func init() {
}
batchPool = &sync.Pool{
New: func() interface{} {
return &batchReq{}
return &batchReq{
dataReqs: make([]*sendDataReq, 0, 50),
}
},
}
}
Expand Down Expand Up @@ -109,12 +111,17 @@ func (b *batchReq) append(req *sendDataReq) {

func (b *batchReq) done(err error) {
errorCode := getErrorCode(err)
for _, req := range b.dataReqs {
for i, req := range b.dataReqs {
req.done(err, errorCode)
b.dataReqs[i] = nil
}
if b.dataReqs != nil {
b.dataReqs = b.dataReqs[:0]
}

if b.callback != nil {
b.callback()
b.callback = nil
}

if b.buffer != nil && b.bufferPool != nil {
Expand All @@ -128,10 +135,12 @@ func (b *batchReq) done(err error) {
}
b.metrics.observeTime(errorCode, time.Since(b.batchTime).Milliseconds())
b.metrics.observeSize(errorCode, b.dataSize)
b.metrics = nil
}

if b.pool != nil {
b.pool.Put(b)
b.pool = nil
}
}

Expand Down Expand Up @@ -334,25 +343,29 @@ type sendDataReq struct {
func (s *sendDataReq) done(err error, errCode string) {
if s.semaphore != nil {
s.semaphore.Release()
if s.metrics != nil {
s.metrics.decPending(s.workerID)
}
s.semaphore = nil
}

if s.callback != nil {
s.callback(s.msg, err)
s.callback = nil
}

if s.metrics != nil {
if s.semaphore != nil {
s.metrics.decPending(s.workerID)
}
if errCode == "" {
errCode = getErrorCode(err)
}

s.metrics.incMessage(errCode)
s.metrics = nil
}

if s.pool != nil {
s.pool.Put(s)
s.pool = nil
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,17 @@ func (w *worker) handleSendData(req *sendDataReq) {
if !ok {
streamID := req.msg.StreamID
batch = batchPool.Get().(*batchReq)
dataReqs := batch.dataReqs
if dataReqs == nil {
dataReqs = make([]*sendDataReq, 0, w.options.BatchingMaxMessages)
}
*batch = batchReq{
pool: batchPool,
workerID: w.indexStr,
batchID: util.SnowFlakeID(),
groupID: w.options.GroupID,
streamID: streamID,
dataReqs: make([]*sendDataReq, 0, w.options.BatchingMaxMessages),
dataReqs: dataReqs,
batchTime: time.Now(),
retries: 0,
bufferPool: w.bufferPool,
Expand Down

0 comments on commit 2b3956d

Please sign in to comment.