Skip to content

Commit

Permalink
rebase on top of the new queue interface
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Oct 26, 2023
1 parent 7c8d4f3 commit 7f98490
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 95 deletions.
73 changes: 33 additions & 40 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/request"
)

// MergeBatcherConfig defines a basic configuration for batching requests based on a timeout and a minimum number of
Expand Down Expand Up @@ -94,7 +95,7 @@ type BatchConfigBatchersLimit struct {
// Do not mutate the requests passed to the function if error can be returned after mutation.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchMergeFunc func(context.Context, Request, Request) (Request, error)
type BatchMergeFunc func(context.Context, request.Request, request.Request) (request.Request, error)

// BatchMergeSplitFunc is a function that merge and/or splits a request into multiple requests based on the provided
// limit of maximum number of items. All the returned requests MUST have a number of items that does not exceed the
Expand All @@ -104,18 +105,18 @@ type BatchMergeFunc func(context.Context, Request, Request) (Request, error)
// greater than 0. Context will be propagated from the original request.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchMergeSplitFunc func(ctx context.Context, optionalReq Request, req Request, maxItems int) ([]Request, error)
type BatchMergeSplitFunc func(ctx context.Context, optionalReq request.Request, req request.Request, maxItems int) ([]request.Request, error)

// IdentifyBatchFunc returns an identifier for a Request batch. This function can be used to separate particular
// Requests into different batches. Batches with different identifiers will not be merged together.
// Provided context can be used to extract information from the context and use it as a part of the identifier as well.
// This function is optional. If not provided, all Requests will be batched together.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type IdentifyBatchFunc func(ctx context.Context, r Request) string
type IdentifyBatchFunc func(ctx context.Context, r request.Request) string

type shardGetter interface {
shard(*request) (*shard, error)
shard(*intrequest.Request) (*shard, error)
}

type Batcher struct {
Expand All @@ -129,7 +130,7 @@ type Batcher struct {
goroutines sync.WaitGroup
logger *zap.Logger

export func(*request) error
export func(*intrequest.Request) error
}

// shutdown is invoked during service shutdown.
Expand All @@ -144,26 +145,23 @@ func (b *Batcher) shutdown() {
func (b *Batcher) newShard() *shard {
s := &shard{
batcher: b,
newRequest: make(chan *request, runtime.NumCPU()),
newRequest: make(chan *intrequest.Request, runtime.NumCPU()),
}
b.goroutines.Add(1)
go s.start()
return s
}

func (b *Batcher) mergeRequests(req1 *request, req2 *request) (*request, error) {
func (b *Batcher) mergeRequests(req1 *intrequest.Request, req2 *intrequest.Request) (*intrequest.Request, error) {
r, err := b.mergeFunc(req1.Context(), req1.Request, req2.Request)
if err != nil {
return nil, err
}
return &request{
baseRequest: baseRequest{ctx: req1.Context()},
Request: r,
}, nil
return intrequest.New(req1.Context(), r), nil
}

func (b *Batcher) splitMergeRequest(optReq *request, req *request, maxItems int) ([]*request, error) {
var optionalReq Request
func (b *Batcher) splitMergeRequest(optReq *intrequest.Request, req *intrequest.Request, maxItems int) ([]*intrequest.Request, error) {
var optionalReq request.Request
if optReq != nil {
optionalReq = optReq.Request
}
Expand All @@ -176,12 +174,9 @@ func (b *Batcher) splitMergeRequest(optReq *request, req *request, maxItems int)

}

reqs := make([]*request, 0, len(rs))
reqs := make([]*intrequest.Request, 0, len(rs))
for _, r := range rs {
reqs = append(reqs, &request{
baseRequest: baseRequest{ctx: req.Context()},
Request: r,
})
reqs = append(reqs, intrequest.New(req.Context(), r))
}
return reqs, nil
}
Expand Down Expand Up @@ -241,40 +236,38 @@ var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batch i
// - batch size reaches cfg.SendBatchSize
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
type batchSender struct {
baseRequestSender
nextSender *baseRequestSender

// batcher will be either *singletonBatcher or *multiBatcher
batcher *Batcher
}

// newBatchSender returns a new batch consumer component.
func newBatchSender(set exporter.CreateSettings, b *Batcher) requestSender {
if b == nil {
return &baseRequestSender{}
}
func newBatchSender(set exporter.CreateSettings, b *Batcher, nextSender *baseRequestSender) requestSender {
b.logger = set.Logger
return &batchSender{
batcher: b,
batcher: b,
nextSender: nextSender,
}
}

// start is invoked during service startup.
func (bs *batchSender) start(_ context.Context, _ component.Host, _ exporter.CreateSettings) error {
bs.batcher.export = func(req *request) error {
func (bs *batchSender) start(_ context.Context, _ component.Host) error { // nolint: unparam
bs.batcher.export = func(req *intrequest.Request) error {
return bs.nextSender.send(req)
}
return nil
}

func (bs *batchSender) send(req internal.Request) error {
s, err := bs.batcher.shard(req.(*request))
func (bs *batchSender) send(req *intrequest.Request) error {
s, err := bs.batcher.shard(req)
if err != nil {
return err
}

// For now the batcher can work asyncronously only. Potentially we can
// add a sync mode later.
s.newRequest <- req.(*request)
s.newRequest <- req
return nil
}

Expand All @@ -292,13 +285,13 @@ type shard struct {
timer *time.Timer

// newRequest is used to receive batches from producers.
newRequest chan *request
newRequest chan *intrequest.Request

// batch is an in-flight data item containing one of the
// underlying data types.
batch *request
batch *intrequest.Request

processRequest func(req *request)
processRequest func(req *intrequest.Request)
}

func (s *shard) start() {
Expand Down Expand Up @@ -341,7 +334,7 @@ func (s *shard) start() {
}

// processRequestFunc return a function to used for requests processing based on the batcher configuration.
func (s *shard) processRequestFunc() func(req *request) {
func (s *shard) processRequestFunc() func(req *intrequest.Request) {
if s.batcher.cfg.MaxSizeItems == 0 {
if s.batcher.cfg.MinSizeItems == 0 || !s.hasTimer() {
return s.processRequestPassThrough
Expand All @@ -351,11 +344,11 @@ func (s *shard) processRequestFunc() func(req *request) {
return s.processRequestMergeSplit
}

func (s *shard) processRequestPassThrough(req *request) {
func (s *shard) processRequestPassThrough(req *intrequest.Request) {
s.export(req)
}

func (s *shard) processRequestMerge(req *request) {
func (s *shard) processRequestMerge(req *intrequest.Request) {
if s.batch == nil {
s.batch = req
} else {
Expand All @@ -369,7 +362,7 @@ func (s *shard) processRequestMerge(req *request) {
s.exporterBatchIfMinSizeReached()
}

func (s *shard) processRequestMergeSplit(req *request) {
func (s *shard) processRequestMergeSplit(req *intrequest.Request) {
reqs, err := s.batcher.splitMergeRequest(s.batch, req, s.batcher.cfg.MaxSizeItems)
if err != nil {
s.batcher.logger.Error("failed to split request", zap.Error(err))
Expand All @@ -392,7 +385,7 @@ func (s *shard) exporterBatchIfMinSizeReached() {
}
}

func (s *shard) export(req *request) {
func (s *shard) export(req *intrequest.Request) {
err := s.batcher.export(req)
// TODO: Replace with metrics and logging.
if err != nil {
Expand Down Expand Up @@ -422,7 +415,7 @@ type singleShardGetter struct {
singleShard *shard
}

func (sb *singleShardGetter) shard(_ *request) (*shard, error) {
func (sb *singleShardGetter) shard(_ *intrequest.Request) (*shard, error) {
return sb.singleShard, nil
}

Expand All @@ -439,7 +432,7 @@ type multiShardGetter struct {
size int
}

func (mb *multiShardGetter) shard(req *request) (*shard, error) {
func (mb *multiShardGetter) shard(req *intrequest.Request) (*shard, error) {
id := mb.identifyBatchFunc(req.Context(), req.Request)
s, ok := mb.batchers.Load(id)
if ok {
Expand Down
58 changes: 30 additions & 28 deletions exporter/exporterhelper/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/request"
)

func TestMergeBatcherConfigValidate(t *testing.T) {
Expand Down Expand Up @@ -73,12 +75,12 @@ func TestBatcherSingleShardMerge(t *testing.T) {

sink := newFakeRequestSink()

require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink})))
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink})))
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink})))
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink,
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 8, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 3, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 3, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 3, sink: sink,
mergeErr: errors.New("merge error")})))
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 1, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 1, sink: sink})))

// the first two requests should be merged into one and sent by reaching the minimum items size
assert.Eventually(t, func() bool {
Expand All @@ -98,7 +100,7 @@ func TestBatcherMultiShardMerge(t *testing.T) {
mergeCfg := NewDefaultMergeBatcherConfig()
mergeCfg.MinSizeItems = 10
mergeCfg.Timeout = 100 * time.Millisecond
batchIdentifierOpt := WithRequestBatchIdentifier(func(_ context.Context, req Request) string {
batchIdentifierOpt := WithRequestBatchIdentifier(func(_ context.Context, req request.Request) string {
return req.(*fakeRequest).batchID
}, BatchConfigBatchersLimit{BatchersLimit: 2})

Expand Down Expand Up @@ -129,23 +131,23 @@ func TestBatcherMultiShardMerge(t *testing.T) {

sink := newFakeRequestSink()

require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink,
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 8, sink: sink,
batchID: "1"}))) // batch 1
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink,
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 3, sink: sink,
batchID: "2"}))) // batch 2
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink,
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 3, sink: sink,
batchID: "1"}))) // batch 1

// batch 1 should be sent by reaching the minimum items size
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11
}, 50*time.Millisecond, 10*time.Millisecond)

require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 9, sink: sink,
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 9, sink: sink,
batchID: "2"}))) // batch 2
require.Error(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink,
require.Error(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 3, sink: sink,
batchID: "3"}))) // batchers limit reached
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 2, sink: sink,
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 2, sink: sink,
batchID: "2"}))) // batch 3

// batch 2 should be sent by reaching the minimum items size
Expand All @@ -172,7 +174,7 @@ func TestBatchSenderShutdown(t *testing.T) {
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

sink := newFakeRequestSink()
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 3, sink: sink})))

require.NoError(t, be.Shutdown(context.Background()))

Expand All @@ -199,36 +201,36 @@ func TestBatcherSingleShardMergeOrSplit(t *testing.T) {
sink := newFakeRequestSink()

// should be sent right away by reaching the minimum items size.
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 8, sink: sink})))

// should be left in the queue because it doesn't reach the minimum items size.
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 3, sink: sink})))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8
}, 50*time.Millisecond, 10*time.Millisecond)

// big request should be merged with the second request and broken down into two requests
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 17, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 17, sink: sink})))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 28
}, 50*time.Millisecond, 10*time.Millisecond)

// big request should be broken down into two requests, both are sent right away by reaching the minimum items size.
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 17, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 17, sink: sink})))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 45
}, 50*time.Millisecond, 10*time.Millisecond)

// request that cannot be split should be dropped.
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 11, sink: sink,
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 11, sink: sink,
splitErr: errors.New("split error")})))

// big request should be broken down into two requests, only the first one is sent right away by reaching the minimum items size.
// the second one should be sent by reaching the timeout.
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 13, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 13, sink: sink})))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 6 && sink.itemsCount.Load() == 55
Expand Down Expand Up @@ -274,10 +276,10 @@ func TestBatcherNoTimeout(t *testing.T) {

sink := newFakeRequestSink()
// should be sent almost right away as two requests even it doesn't reach the minimum items size.
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 12, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 12, sink: sink})))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == tt.expectedBatches && sink.itemsCount.Load() == 12
}, 50*time.Millisecond, 10*time.Millisecond)
}, 100*time.Millisecond, 10*time.Millisecond)
})
}
}
Expand All @@ -298,19 +300,19 @@ func TestBatcherDisabled(t *testing.T) {

sink := newFakeRequestSink()
// should be sent right away because batching is disabled.
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 8, sink: sink})))
assert.Equal(t, int64(1), sink.requestsCount.Load())
assert.Equal(t, int64(8), sink.itemsCount.Load())
}

func TestBatcherInvalidMergeSplitFunc(t *testing.T) {
invalidMergeSplitFunc := func(_ context.Context, _ Request, req2 Request, _ int) ([]Request, error) {
invalidMergeSplitFunc := func(_ context.Context, _ request.Request, req2 request.Request, _ int) ([]request.Request, error) {
// reply with invalid 0 length slice if req2 is more than 20 items
if req2.(*fakeRequest).items > 20 {
return []Request{}, nil
if req2.(*fakeRequest).Items > 20 {
return []request.Request{}, nil
}
// otherwise reply with a single request.
return []Request{req2}, nil
return []request.Request{req2}, nil
}
mergeCfg := NewDefaultMergeBatcherConfig()
mergeCfg.Timeout = 50 * time.Millisecond
Expand All @@ -327,9 +329,9 @@ func TestBatcherInvalidMergeSplitFunc(t *testing.T) {

sink := newFakeRequestSink()
// first request should be ignored due to invalid merge/split function.
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 30, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 30, sink: sink})))
// second request should be sent after reaching the timeout.
require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 15, sink: sink})))
require.NoError(t, be.send(intrequest.New(context.Background(), &fakeRequest{Items: 15, sink: sink})))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15
}, 100*time.Millisecond, 10*time.Millisecond)
Expand Down
Loading

0 comments on commit 7f98490

Please sign in to comment.