Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: vaccum handling in error detail reports table #4945

Merged
merged 9 commits into from
Aug 5, 2024
Merged
78 changes: 57 additions & 21 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"github.com/samber/lo"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -80,6 +81,9 @@
minReportedAtQueryTime stats.Measurement
errorDetailReportsQueryTime stats.Measurement
edReportingRequestLatency stats.Measurement

stats stats.Stats
config *config.Config
}

type errorDetails struct {
Expand All @@ -90,16 +94,18 @@
func NewErrorDetailReporter(
ctx context.Context,
configSubscriber *configSubscriber,
stats stats.Stats,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏 for injecting stats

conf *config.Config,
) *ErrorDetailReporter {
tr := &http.Transport{}
reportingServiceURL := config.GetString("REPORTING_URL", "https://reporting.dev.rudderlabs.com")
reportingServiceURL := conf.GetString("REPORTING_URL", "https://reporting.dev.rudderlabs.com")
reportingServiceURL = strings.TrimSuffix(reportingServiceURL, "/")

netClient := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)}
mainLoopSleepInterval := config.GetReloadableDurationVar(5, time.Second, "Reporting.mainLoopSleepInterval")
sleepInterval := config.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval")
maxConcurrentRequests := config.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
maxOpenConnections := config.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections")
netClient := &http.Client{Transport: tr, Timeout: conf.GetDuration("HttpClient.reporting.timeout", 60, time.Second)}
mainLoopSleepInterval := conf.GetReloadableDurationVar(5, time.Second, "Reporting.mainLoopSleepInterval")
sleepInterval := conf.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval")
maxConcurrentRequests := conf.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
maxOpenConnections := conf.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections")

log := logger.NewLogger().Child("enterprise").Child("error-detail-reporting")
extractor := NewErrorDetailExtractor(log)
Expand All @@ -117,13 +123,15 @@
httpClient: netClient,

namespace: config.GetKubeNamespace(),
instanceID: config.GetString("INSTANCE_ID", "1"),
region: config.GetString("region", ""),
instanceID: conf.GetString("INSTANCE_ID", "1"),
region: conf.GetString("region", ""),

configSubscriber: configSubscriber,
syncers: make(map[string]*types.SyncSource),
errorDetailExtractor: extractor,
maxOpenConnections: maxOpenConnections,
stats: stats,
config: conf,
}
}

Expand All @@ -143,7 +151,7 @@
}
edr.syncers[c.ConnInfo] = &types.SyncSource{SyncerConfig: c, DbHandle: dbHandle}

if !config.GetBool("Reporting.errorReporting.syncer.enabled", true) {
if !edr.config.GetBool("Reporting.errorReporting.syncer.enabled", true) {

Check warning on line 154 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L154

Added line #L154 was not covered by tests
return func() {}
}

Expand Down Expand Up @@ -197,7 +205,7 @@
// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse)

stats.Default.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{

Check warning on line 208 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L208

Added line #L208 was not covered by tests
"errorCode": errDets.ErrorCode,
"workspaceId": workspaceID,
"destType": destinationDetail.destType,
Expand Down Expand Up @@ -256,7 +264,7 @@
Handle: dbHandle,
MigrationsTable: fmt.Sprintf("%v_migrations", ErrorDetailReportsTable),
// TODO: shall we use separate env ?
ShouldForceSetLowerVersion: config.GetBool("SQLMigrator.forceSetLowerVersion", true),
ShouldForceSetLowerVersion: edr.config.GetBool("SQLMigrator.forceSetLowerVersion", true),

Check warning on line 267 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L267

Added line #L267 was not covered by tests
}
err = m.Migrate(ErrorDetailReportsTable)
if err != nil {
Expand Down Expand Up @@ -297,17 +305,17 @@

tags := edr.getTags(c.Label)

mainLoopTimer := stats.Default.NewTaggedStat("error_detail_reports_main_loop_time", stats.TimerType, tags)
getReportsTimer := stats.Default.NewTaggedStat("error_detail_reports_get_reports_time", stats.TimerType, tags)
aggregateTimer := stats.Default.NewTaggedStat("error_detail_reports_aggregate_time", stats.TimerType, tags)
getReportsSize := stats.Default.NewTaggedStat("error_detail_reports_size", stats.HistogramType, tags)
getAggregatedReportsSize := stats.Default.NewTaggedStat("error_detail_reports_aggregated_size", stats.HistogramType, tags)
mainLoopTimer := edr.stats.NewTaggedStat("error_detail_reports_main_loop_time", stats.TimerType, tags)
getReportsTimer := edr.stats.NewTaggedStat("error_detail_reports_get_reports_time", stats.TimerType, tags)
aggregateTimer := edr.stats.NewTaggedStat("error_detail_reports_aggregate_time", stats.TimerType, tags)
getReportsSize := edr.stats.NewTaggedStat("error_detail_reports_size", stats.HistogramType, tags)
getAggregatedReportsSize := edr.stats.NewTaggedStat("error_detail_reports_aggregated_size", stats.HistogramType, tags)

Check warning on line 312 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L308-L312

Added lines #L308 - L312 were not covered by tests

errorDetailReportsDeleteQueryTimer := stats.Default.NewTaggedStat("error_detail_reports_delete_query_time", stats.TimerType, tags)
errorDetailReportsDeleteQueryTimer := edr.stats.NewTaggedStat("error_detail_reports_delete_query_time", stats.TimerType, tags)

Check warning on line 314 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L314

Added line #L314 was not covered by tests

edr.minReportedAtQueryTime = stats.Default.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags)
edr.errorDetailReportsQueryTime = stats.Default.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags)
edr.edReportingRequestLatency = stats.Default.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags)
edr.minReportedAtQueryTime = edr.stats.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags)
edr.errorDetailReportsQueryTime = edr.stats.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags)
edr.edReportingRequestLatency = edr.stats.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags)

Check warning on line 318 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L316-L318

Added lines #L316 - L318 were not covered by tests

// In infinite loop
// Get Reports
Expand Down Expand Up @@ -375,6 +383,8 @@
if err != nil {
edr.log.Errorf("[ Error Detail Reporting ]: Error deleting local reports from %s: %v", ErrorDetailReportsTable, err)
}
// vacuum error_reports_details table
edr.vacuum(ctx, dbHandle, c)

Check warning on line 387 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L387

Added line #L387 was not covered by tests
}

mainLoopTimer.Since(loopStart)
Expand All @@ -386,6 +396,32 @@
}
}

func (edr *ErrorDetailReporter) vacuum(ctx context.Context, dbHandle *sql.DB, c types.SyncerConfig) {
tags := edr.getTags(c.Label)
var sizeEstimate int64
if err := dbHandle.QueryRowContext(
ctx,
`SELECT pg_table_size(oid) from pg_class where relname = $1`, ErrorDetailReportsTable,
).Scan(&sizeEstimate); err != nil {
edr.log.Errorn(
fmt.Sprintf(`Error getting %s table size estimate`, ErrorDetailReportsTable),
logger.NewErrorField(err),
)

Check warning on line 409 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L399-L409

Added lines #L399 - L409 were not covered by tests
}
if sizeEstimate > edr.config.GetInt64("Reporting.errorReporting.vacuumThresholdBytes", 5*bytesize.GB) {
vacuumStart := time.Now()
vacuumDuration := edr.stats.NewTaggedStat(StatReportingVacuumDuration, stats.TimerType, tags)
vaccumStatement := fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable))
if _, err := dbHandle.ExecContext(ctx, vaccumStatement); err != nil {
edr.log.Errorn(
fmt.Sprintf(`Error vacuuming %s table`, ErrorDetailReportsTable),
logger.NewErrorField(err),
)

Check warning on line 419 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L411-L419

Added lines #L411 - L419 were not covered by tests
}
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved
vacuumDuration.Since(vacuumStart)

Check warning on line 421 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L421

Added line #L421 was not covered by tests
}
}

func (edr *ErrorDetailReporter) getReports(ctx context.Context, currentMs int64, syncerKey string) ([]*types.EDReportsDB, int64) {
var queryMin sql.NullInt64
dbHandle, err := edr.getDBHandle(syncerKey)
Expand Down Expand Up @@ -602,7 +638,7 @@
edr.edReportingRequestLatency.Since(httpRequestStart)
httpStatTags := edr.getTags(label)
httpStatTags["status"] = strconv.Itoa(resp.StatusCode)
stats.Default.NewTaggedStat("error_detail_reporting_http_request", stats.CountType, httpStatTags).Increment()
edr.stats.NewTaggedStat("error_detail_reporting_http_request", stats.CountType, httpStatTags).Increment()

Check warning on line 641 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L641

Added line #L641 was not covered by tests

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
Expand Down
2 changes: 1 addition & 1 deletion enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke

// error reporting implementation
if config.GetBool("Reporting.errorReporting.enabled", false) {
errorReporter := NewErrorDetailReporter(rm.ctx, configSubscriber)
errorReporter := NewErrorDetailReporter(rm.ctx, configSubscriber, rm.stats, config.Default)
rm.reporters = append(rm.reporters, errorReporter)
}

Expand Down
5 changes: 3 additions & 2 deletions enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -415,7 +416,7 @@ func TestExtractErrorDetails(t *testing.T) {
},
}

edr := NewErrorDetailReporter(context.Background(), &configSubscriber{})
edr := NewErrorDetailReporter(context.Background(), &configSubscriber{}, stats.NOP, config.Default)
for _, tc := range testCases {
t.Run(tc.caseDescription, func(t *testing.T) {
errorDetails := edr.extractErrorDetails(tc.inputErrMsg)
Expand Down Expand Up @@ -599,7 +600,7 @@ func TestAggregationLogic(t *testing.T) {
},
}
configSubscriber := newConfigSubscriber(logger.NOP)
ed := NewErrorDetailReporter(context.Background(), configSubscriber)
ed := NewErrorDetailReporter(context.Background(), configSubscriber, stats.NOP, config.Default)
reportingMetrics := ed.aggregate(dbErrs)

reportResults := []*types.EDMetric{
Expand Down
Loading