From af40a28c2149745a81929bfe4eb9decc87a12a32 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 19 Dec 2024 13:00:45 -0800 Subject: [PATCH] fix build & tests. --- pkg/blockbuilder/blockbuilder.go | 2 +- pkg/blockbuilder/scheduler/scheduler.go | 3 ++- pkg/blockbuilder/scheduler/scheduler_test.go | 12 ++++++------ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/blockbuilder/blockbuilder.go b/pkg/blockbuilder/blockbuilder.go index 753e8d00b0..93a0e4a5fb 100644 --- a/pkg/blockbuilder/blockbuilder.go +++ b/pkg/blockbuilder/blockbuilder.go @@ -108,7 +108,7 @@ func (b *BlockBuilder) makeSchedulerClient() (schedulerpb.SchedulerClient, error b.logger, b.cfg.SchedulerConfig.UpdateInterval, b.cfg.SchedulerConfig.MaxUpdateAge, - ), nil + ) } func (b *BlockBuilder) starting(context.Context) (err error) { diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index f89fe766ce..a4fc8df455 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -133,6 +133,7 @@ func (s *BlockBuilderScheduler) running(ctx context.Context) error { } } +// completeObservationMode transitions the scheduler from observation mode to normal operation. func (s *BlockBuilderScheduler) completeObservationMode() { s.mu.Lock() defer s.mu.Unlock() @@ -331,7 +332,7 @@ func (s *BlockBuilderScheduler) updateJob(key jobKey, workerID string, complete } if c, ok := s.committed.Lookup(s.cfg.Kafka.Topic, j.Partition); ok { - if j.StartOffset <= c.At { + if j.EndOffset <= c.At { // Update of a completed/committed job. Ignore. level.Debug(logger).Log("msg", "ignored historical job") return nil diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index bbdbf7e3f4..0a4f1d7aab 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -316,12 +316,12 @@ func TestOffsetMovement(t *testing.T) { } sched.completeObservationMode() - spec := jobSpec{ - topic: "ingest", - partition: 1, - commitRecTs: time.Unix(200, 0), - startOffset: 5000, - endOffset: 6000, + spec := schedulerpb.JobSpec{ + Topic: "ingest", + Partition: 1, + CommitRecTs: time.Unix(200, 0), + StartOffset: 5000, + EndOffset: 6000, } sched.jobs.addOrUpdate("ingest/1/5524", spec)