diff --git a/processor/memorylimiterprocessor/README.md b/processor/memorylimiterprocessor/README.md index 0e896d0e271..e0e4f22a22e 100644 --- a/processor/memorylimiterprocessor/README.md +++ b/processor/memorylimiterprocessor/README.md @@ -13,21 +13,32 @@ on the configured processors, it is important to put checks in place regarding memory usage. The memory_limiter processor allows to perform periodic checks of memory -usage if it exceeds defined limits will begin dropping data and forcing GC to reduce +usage if it exceeds defined limits will begin refusing data and forcing GC to reduce memory consumption. The memory_limiter uses soft and hard memory limits. Hard limit is always above or equal the soft limit. -When the memory usage exceeds the soft limit the processor will start dropping the data and -return errors to the preceding component it in the pipeline (which should be normally a -receiver). +When the memory usage exceeds the soft limit the processor will enter the memory limited +mode and will start refusing the data by returning errors to the preceding component +in the pipeline that made the ConsumeLogs/Trace/Metrics function call. +The preceding component should be normally a receiver. -When the memory usage is above the hard limit in addition to dropping the data the +In memory limited mode the error returned by ConsumeLogs/Trace/Metrics function is a +non-permanent error. When receivers see this error they are expected to retry sending +the same data. The receivers may also apply a backpressure to their data sources +in order to slow down the inflow of data into the Collector and allow the memory usage +to go below the limits. + +>Warning: if the component preceding the memory limiter in the pipeline does not correctly +retry and send the data again after ConsumeLogs/Trace/Metrics functions return then that +data will be permanently lost. We consider such components incorrectly implemented. + +When the memory usage is above the hard limit in addition to refusing the data the processor will forcedly perform garbage collection in order to try to free memory. When the memory usage drop below the soft limit, the normal operation is resumed (data -will not longer be dropped and no forced garbage collection will be performed). +will no longer be refused and no forced garbage collection will be performed). The difference between the soft limit and hard limits is defined via `spike_limit_mib` configuration option. The value of this option should be selected in a way that ensures @@ -39,8 +50,9 @@ A good starting point for `spike_limit_mib` is 20% of the hard limit. Bigger Note that while the processor can help mitigate out of memory situations, it is not a replacement for properly sizing and configuring the collector. Keep in mind that if the soft limit is crossed, the collector will -return errors to all receive operations until enough memory is freed. This will -result in dropped data. +return errors to all receive operations until enough memory is freed. This may +eventually result in dropped data since the receivers may not be able to hold back +and retry the data indefinitely. It is highly recommended to configure `ballastextension` as well as the `memory_limiter` processor on every collector. The ballast should be configured to diff --git a/processor/memorylimiterprocessor/config.go b/processor/memorylimiterprocessor/config.go index 5bf18b0d2d9..b81f2f84ecf 100644 --- a/processor/memorylimiterprocessor/config.go +++ b/processor/memorylimiterprocessor/config.go @@ -13,8 +13,7 @@ // limitations under the License. // Package memorylimiterprocessor provides a processor for OpenTelemetry Service pipeline -// that drops data on the pipeline according to the current state of memory -// usage. +// that refuses data on the pipeline according to the current state of memory usage. package memorylimiterprocessor // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor" import ( diff --git a/processor/memorylimiterprocessor/internal/mock_exporter.go b/processor/memorylimiterprocessor/internal/mock_exporter.go new file mode 100644 index 00000000000..73e76805926 --- /dev/null +++ b/processor/memorylimiterprocessor/internal/mock_exporter.go @@ -0,0 +1,78 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal" + +import ( + "context" + "sync/atomic" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" +) + +type MockExporter struct { + destAvailable int64 + acceptedLogCount int64 + deliveredLogCount int64 + Logs []plog.Logs +} + +var _ consumer.Logs = (*MockExporter)(nil) + +func (e *MockExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error { + atomic.AddInt64(&e.acceptedLogCount, int64(ld.LogRecordCount())) + + if atomic.LoadInt64(&e.destAvailable) == 1 { + // Destination is available, immediately deliver. + atomic.AddInt64(&e.deliveredLogCount, int64(ld.LogRecordCount())) + } else { + // Destination is not available. Queue the logs in the exporter. + e.Logs = append(e.Logs, ld) + } + return nil +} + +func (e *MockExporter) SetDestAvailable(available bool) { + if available { + // Pretend we delivered all queued accepted logs. + atomic.AddInt64(&e.deliveredLogCount, e.acceptedLogCount) + + // Get rid of the delivered logs so that memory can be collected. + e.Logs = nil + + // Now mark destination available so that subsequent ConsumeLogs + // don't queue the logs anymore. + atomic.StoreInt64(&e.destAvailable, 1) + + } else { + atomic.StoreInt64(&e.destAvailable, 0) + } +} + +func (e *MockExporter) AcceptedLogCount() int { + return int(atomic.LoadInt64(&e.acceptedLogCount)) +} + +func (e *MockExporter) DeliveredLogCount() int { + return int(atomic.LoadInt64(&e.deliveredLogCount)) +} + +func NewMockExporter() *MockExporter { + return &MockExporter{} +} diff --git a/processor/memorylimiterprocessor/internal/mock_receiver.go b/processor/memorylimiterprocessor/internal/mock_receiver.go new file mode 100644 index 00000000000..5dd19ce1fa5 --- /dev/null +++ b/processor/memorylimiterprocessor/internal/mock_receiver.go @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal // import "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal" + +import ( + "context" + "strings" + "sync" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/pdata/plog" +) + +type MockReceiver struct { + ProduceCount int + NextConsumer consumer.Logs + lastConsumeResult error + mux sync.Mutex +} + +func (m *MockReceiver) Start() { + go m.produce() +} + +// This function demonstrates how the receivers should behave when the ConsumeLogs/Traces/Metrics +// call returns an error. +func (m *MockReceiver) produce() { + for i := 0; i < m.ProduceCount; i++ { + // Create a large log to consume some memory. + ld := plog.NewLogs() + lr := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + kiloStr := strings.Repeat("x", 10*1024) + lr.SetSeverityText(kiloStr) + + retry: + // Send to the pipeline. + err := m.NextConsumer.ConsumeLogs(context.Background(), ld) + + // Remember the result to be used in the tests. + m.mux.Lock() + m.lastConsumeResult = err + m.mux.Unlock() + + if err != nil { + // Sending to the pipeline failed. + if !consumererror.IsPermanent(err) { + // Retryable error. Try the same data again. + goto retry + } + // Permanent error. Drop it. + } + } +} + +func (m *MockReceiver) LastConsumeResult() error { + m.mux.Lock() + defer m.mux.Unlock() + return m.lastConsumeResult +} diff --git a/processor/memorylimiterprocessor/memorylimiter.go b/processor/memorylimiterprocessor/memorylimiter.go index 09c6042cb4f..6cad937a20e 100644 --- a/processor/memorylimiterprocessor/memorylimiter.go +++ b/processor/memorylimiterprocessor/memorylimiter.go @@ -39,9 +39,9 @@ const ( ) var ( - // errForcedDrop will be returned to callers of ConsumeTraceData to indicate - // that data is being dropped due to high memory usage. - errForcedDrop = errors.New("data dropped due to high memory usage") + // errDataRefused will be returned to callers of ConsumeTraceData to indicate + // that data is being refused due to high memory usage. + errDataRefused = errors.New("data refused due to high memory usage") // Construction errors @@ -70,8 +70,8 @@ type memoryLimiter struct { memCheckWait time.Duration ballastSize uint64 - // forceDrop is used atomically to indicate when data should be dropped. - forceDrop *atomic.Bool + // mustRefuse is used to indicate when data should be refused. + mustRefuse *atomic.Bool ticker *time.Ticker @@ -129,7 +129,7 @@ func newMemoryLimiter(set processor.CreateSettings, cfg *Config) (*memoryLimiter ticker: time.NewTicker(cfg.CheckInterval), readMemStatsFn: runtime.ReadMemStats, logger: logger, - forceDrop: &atomic.Bool{}, + mustRefuse: &atomic.Bool{}, obsrep: obsrep, } @@ -180,15 +180,15 @@ func (ml *memoryLimiter) shutdown(context.Context) error { func (ml *memoryLimiter) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { numSpans := td.SpanCount() - if ml.forceDrop.Load() { + if ml.mustRefuse.Load() { // TODO: actually to be 100% sure that this is "refused" and not "dropped" // it is necessary to check the pipeline to see if this is directly connected // to a receiver (ie.: a receiver is on the call stack). For now it // assumes that the pipeline is properly configured and a receiver is on the - // callstack. + // callstack and that the receiver will correctly retry the refused data again. ml.obsrep.TracesRefused(ctx, numSpans) - return td, errForcedDrop + return td, errDataRefused } // Even if the next consumer returns error record the data as accepted by @@ -199,14 +199,14 @@ func (ml *memoryLimiter) processTraces(ctx context.Context, td ptrace.Traces) (p func (ml *memoryLimiter) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { numDataPoints := md.DataPointCount() - if ml.forceDrop.Load() { + if ml.mustRefuse.Load() { // TODO: actually to be 100% sure that this is "refused" and not "dropped" // it is necessary to check the pipeline to see if this is directly connected // to a receiver (ie.: a receiver is on the call stack). For now it // assumes that the pipeline is properly configured and a receiver is on the // callstack. ml.obsrep.MetricsRefused(ctx, numDataPoints) - return md, errForcedDrop + return md, errDataRefused } // Even if the next consumer returns error record the data as accepted by @@ -217,7 +217,7 @@ func (ml *memoryLimiter) processMetrics(ctx context.Context, md pmetric.Metrics) func (ml *memoryLimiter) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { numRecords := ld.LogRecordCount() - if ml.forceDrop.Load() { + if ml.mustRefuse.Load() { // TODO: actually to be 100% sure that this is "refused" and not "dropped" // it is necessary to check the pipeline to see if this is directly connected // to a receiver (ie.: a receiver is on the call stack). For now it @@ -225,7 +225,7 @@ func (ml *memoryLimiter) processLogs(ctx context.Context, ld plog.Logs) (plog.Lo // callstack. ml.obsrep.LogsRefused(ctx, numRecords) - return ld, errForcedDrop + return ld, errDataRefused } // Even if the next consumer returns error record the data as accepted by @@ -288,33 +288,33 @@ func (ml *memoryLimiter) checkMemLimits() { ms = ml.doGCandReadMemStats() } - // Remember current dropping state. - wasForcingDrop := ml.forceDrop.Load() + // Remember current state. + wasRefusing := ml.mustRefuse.Load() // Check if the memory usage is above the soft limit. - mustForceDrop := ml.usageChecker.aboveSoftLimit(ms) + mustRefuse := ml.usageChecker.aboveSoftLimit(ms) - if wasForcingDrop && !mustForceDrop { - // Was previously dropping but enough memory is available now, no need to limit. + if wasRefusing && !mustRefuse { + // Was previously refusing but enough memory is available now, no need to limit. ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms)) } - if !wasForcingDrop && mustForceDrop { + if !wasRefusing && mustRefuse { // We are above soft limit, do a GC if it wasn't done recently and see if // it brings memory usage below the soft limit. if time.Since(ml.lastGCDone) > minGCIntervalWhenSoftLimited { ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms)) ms = ml.doGCandReadMemStats() // Check the limit again to see if GC helped. - mustForceDrop = ml.usageChecker.aboveSoftLimit(ms) + mustRefuse = ml.usageChecker.aboveSoftLimit(ms) } - if mustForceDrop { - ml.logger.Warn("Memory usage is above soft limit. Dropping data.", memstatToZapField(ms)) + if mustRefuse { + ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms)) } } - ml.forceDrop.Store(mustForceDrop) + ml.mustRefuse.Store(mustRefuse) } type memUsageChecker struct { diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 19b910f728c..11dd1743eed 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -35,6 +35,8 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal" "go.opentelemetry.io/collector/processor/processorhelper" "go.opentelemetry.io/collector/processor/processortest" ) @@ -112,7 +114,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { usageChecker: memUsageChecker{ memAllocLimit: 1024, }, - forceDrop: &atomic.Bool{}, + mustRefuse: &atomic.Bool{}, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, @@ -140,7 +142,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Above memAllocLimit. currentMemAlloc = 1800 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) + assert.Equal(t, errDataRefused, mp.ConsumeMetrics(ctx, md)) // Check ballast effect ml.ballastSize = 1000 @@ -153,7 +155,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize ml.checkMemLimits() - assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) + assert.Equal(t, errDataRefused, mp.ConsumeMetrics(ctx, md)) // Restore ballast to default. ml.ballastSize = 0 @@ -169,7 +171,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Above memSpikeLimit. currentMemAlloc = 550 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, md)) + assert.Equal(t, errDataRefused, mp.ConsumeMetrics(ctx, md)) } @@ -181,7 +183,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { usageChecker: memUsageChecker{ memAllocLimit: 1024, }, - forceDrop: &atomic.Bool{}, + mustRefuse: &atomic.Bool{}, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, @@ -209,7 +211,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Above memAllocLimit. currentMemAlloc = 1800 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) + assert.Equal(t, errDataRefused, tp.ConsumeTraces(ctx, td)) // Check ballast effect ml.ballastSize = 1000 @@ -222,7 +224,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize ml.checkMemLimits() - assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) + assert.Equal(t, errDataRefused, tp.ConsumeTraces(ctx, td)) // Restore ballast to default. ml.ballastSize = 0 @@ -238,7 +240,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Above memSpikeLimit. currentMemAlloc = 550 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) + assert.Equal(t, errDataRefused, tp.ConsumeTraces(ctx, td)) } @@ -250,7 +252,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { usageChecker: memUsageChecker{ memAllocLimit: 1024, }, - forceDrop: &atomic.Bool{}, + mustRefuse: &atomic.Bool{}, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, @@ -278,7 +280,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Above memAllocLimit. currentMemAlloc = 1800 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) + assert.Equal(t, errDataRefused, lp.ConsumeLogs(ctx, ld)) // Check ballast effect ml.ballastSize = 1000 @@ -291,7 +293,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize ml.checkMemLimits() - assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) + assert.Equal(t, errDataRefused, lp.ConsumeLogs(ctx, ld)) // Restore ballast to default. ml.ballastSize = 0 @@ -307,7 +309,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Above memSpikeLimit. currentMemAlloc = 550 ml.checkMemLimits() - assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) + assert.Equal(t, errDataRefused, lp.ConsumeLogs(ctx, ld)) } func TestGetDecision(t *testing.T) { @@ -349,7 +351,7 @@ func TestGetDecision(t *testing.T) { }) } -func TestDropDecision(t *testing.T) { +func TestRefuseDecision(t *testing.T) { decison1000Limit30Spike30, err := newPercentageMemUsageChecker(1000, 60, 30) require.NoError(t, err) decison1000Limit60Spike50, err := newPercentageMemUsageChecker(1000, 60, 50) @@ -364,46 +366,46 @@ func TestDropDecision(t *testing.T) { name string usageChecker memUsageChecker ms *runtime.MemStats - shouldDrop bool + shouldRefuse bool }{ { - name: "should drop over limit", + name: "should refuse over limit", usageChecker: *decison1000Limit30Spike30, ms: &runtime.MemStats{Alloc: 600}, - shouldDrop: true, + shouldRefuse: true, }, { - name: "should not drop", + name: "should not refuse", usageChecker: *decison1000Limit30Spike30, ms: &runtime.MemStats{Alloc: 100}, - shouldDrop: false, + shouldRefuse: false, }, { - name: "should not drop spike, fixed usageChecker", + name: "should not refuse spike, fixed usageChecker", usageChecker: memUsageChecker{ memAllocLimit: 600, memSpikeLimit: 500, }, - ms: &runtime.MemStats{Alloc: 300}, - shouldDrop: true, + ms: &runtime.MemStats{Alloc: 300}, + shouldRefuse: true, }, { - name: "should drop, spike, percentage usageChecker", + name: "should refuse, spike, percentage usageChecker", usageChecker: *decison1000Limit60Spike50, ms: &runtime.MemStats{Alloc: 300}, - shouldDrop: true, + shouldRefuse: true, }, { - name: "should drop, spike, percentage usageChecker", + name: "should refuse, spike, percentage usageChecker", usageChecker: *decison1000Limit40Spike20, ms: &runtime.MemStats{Alloc: 250}, - shouldDrop: true, + shouldRefuse: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - shouldDrop := test.usageChecker.aboveSoftLimit(test.ms) - assert.Equal(t, test.shouldDrop, shouldDrop) + shouldRefuse := test.usageChecker.aboveSoftLimit(test.ms) + assert.Equal(t, test.shouldRefuse, shouldRefuse) }) } } @@ -452,3 +454,86 @@ func newObsReport(t *testing.T) *obsreport.Processor { return proc } + +func TestNoDataLoss(t *testing.T) { + // Create an exporter. + exporter := internal.NewMockExporter() + + // Mark exporter's destination unavailable. The exporter will accept data and will queue it, + // thus increasing the memory usage of the Collector. + exporter.SetDestAvailable(false) + + // Create a memory limiter processor. + + cfg := createDefaultConfig().(*Config) + + // Check frequently to make the test quick. + cfg.CheckInterval = time.Millisecond * 10 + + // By how much we expect memory usage to increase because of queuing up of produced data. + const expectedMemoryIncreaseMiB = 10 + + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + + // Set the limit to current usage plus expected increase. This means initially we will not be limited. + cfg.MemoryLimitMiB = uint32(ms.Alloc/(1024*1024) + expectedMemoryIncreaseMiB) + cfg.MemorySpikeLimitMiB = 1 + + set := processortest.NewNopCreateSettings() + + limiter, err := newMemoryLimiter(set, cfg) + require.NoError(t, err) + + processor, err := processorhelper.NewLogsProcessor(context.Background(), processor.CreateSettings{}, cfg, exporter, + limiter.processLogs, + processorhelper.WithStart(limiter.start), + processorhelper.WithShutdown(limiter.shutdown)) + require.NoError(t, err) + + // Create a receiver. + + receiver := &internal.MockReceiver{ + ProduceCount: 1e5, // Must produce enough logs to make sure memory increases by at least expectedMemoryIncreaseMiB + NextConsumer: processor, + } + + err = processor.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + // Start producing data. + receiver.Start() + + // The exporter was created such that its destination is not available. + // This will result in queuing of produced data inside the exporter and memory usage + // will increase. + + // We must eventually hit the memory limit and the receiver must see an error from memory limiter. + require.Eventually(t, func() bool { + // Did last ConsumeLogs call return an error? + return receiver.LastConsumeResult() != nil + }, 5*time.Second, 1*time.Millisecond) + + // We are now memory limited and receiver can't produce data anymore. + + // Now make the exporter's destination available. + exporter.SetDestAvailable(true) + + // We should now see that exporter's queue is purged and memory usage goes down. + + // Eventually we must see that receiver's ConsumeLog call returns success again. + require.Eventually(t, func() bool { + return receiver.LastConsumeResult() == nil + }, 5*time.Second, 1*time.Millisecond) + + // And eventually the exporter must confirm that it delivered exact number of produced logs. + require.Eventually(t, func() bool { + return receiver.ProduceCount == exporter.DeliveredLogCount() + }, 5*time.Second, 1*time.Millisecond) + + // Double check that the number of logs accepted by exporter matches the number of produced by receiver. + assert.Equal(t, receiver.ProduceCount, exporter.AcceptedLogCount()) + + err = processor.Shutdown(context.Background()) + require.NoError(t, err) +}