Skip to content

Commit

Permalink
fix build & tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
seizethedave committed Dec 19, 2024
1 parent 05db4f8 commit af40a28
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (b *BlockBuilder) makeSchedulerClient() (schedulerpb.SchedulerClient, error
b.logger,
b.cfg.SchedulerConfig.UpdateInterval,
b.cfg.SchedulerConfig.MaxUpdateAge,
), nil
)
}

func (b *BlockBuilder) starting(context.Context) (err error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (s *BlockBuilderScheduler) running(ctx context.Context) error {
}
}

// completeObservationMode transitions the scheduler from observation mode to normal operation.
func (s *BlockBuilderScheduler) completeObservationMode() {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -331,7 +332,7 @@ func (s *BlockBuilderScheduler) updateJob(key jobKey, workerID string, complete
}

if c, ok := s.committed.Lookup(s.cfg.Kafka.Topic, j.Partition); ok {
if j.StartOffset <= c.At {
if j.EndOffset <= c.At {
// Update of a completed/committed job. Ignore.
level.Debug(logger).Log("msg", "ignored historical job")
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,12 @@ func TestOffsetMovement(t *testing.T) {
}
sched.completeObservationMode()

spec := jobSpec{
topic: "ingest",
partition: 1,
commitRecTs: time.Unix(200, 0),
startOffset: 5000,
endOffset: 6000,
spec := schedulerpb.JobSpec{
Topic: "ingest",
Partition: 1,
CommitRecTs: time.Unix(200, 0),
StartOffset: 5000,
EndOffset: 6000,
}

sched.jobs.addOrUpdate("ingest/1/5524", spec)
Expand Down

0 comments on commit af40a28

Please sign in to comment.