Skip to content

Commit

Permalink
Improve unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Jan 9, 2023
1 parent 6b47cf8 commit 1c869eb
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,8 +967,14 @@ func TestCheckpointing(t *testing.T) {

enc := MakeElementEncoder(wvERSCoder)
var buf bytes.Buffer
if err := enc.Encode(value, &buf); err != nil {
t.Fatalf("couldn't encode value: %v", err)

// We encode the element several times to ensure we don't
// drop any residuals, the root of issue #24931.
wantCount := 3
for i := 0; i < wantCount; i++ {
if err := enc.Encode(value, &buf); err != nil {
t.Fatalf("couldn't encode value: %v", err)
}
}

if err := root.StartBundle(ctx, "testBund", DataContext{
Expand All @@ -981,20 +987,22 @@ func TestCheckpointing(t *testing.T) {
}
cps, err := root.Process(ctx)
if err != nil {
t.Fatalf("checkpointThis() = %v, %v, want nil", cps, err)
}
if len(cps) == 0 {
t.Fatalf("checkpointThis() = %v, want not non-empty", cps)
}
cp := cps[0]
if got, want := cp.Reapply, time.Minute; got != want {
t.Errorf("checkpointThis(delay(%v)) delay = %v, want %v", want, got, want)
t.Fatalf("Process() = %v, %v, want nil", cps, err)
}
if got, want := cp.SR.TId, root.Out.(*ProcessSizedElementsAndRestrictions).TfId; got != want {
t.Errorf("checkpointThis() transformID = %v, want %v", got, want)
if got, want := len(cps), wantCount; got != want {
t.Fatalf("Process() = len %v checkpoints, want %v", got, want)
}
if got, want := cp.SR.InId, "i0"; got != want {
t.Errorf("checkpointThis() transformID = %v, want %v", got, want)
// Check each checkpoint has the expected values.
for _, cp := range cps {
if got, want := cp.Reapply, time.Minute; got != want {
t.Errorf("Process(delay(%v)) delay = %v, want %v", want, got, want)
}
if got, want := cp.SR.TId, root.Out.(*ProcessSizedElementsAndRestrictions).TfId; got != want {
t.Errorf("Process() transformID = %v, want %v", got, want)
}
if got, want := cp.SR.InId, "i0"; got != want {
t.Errorf("Process() transformID = %v, want %v", got, want)
}
}
})
}
Expand Down

0 comments on commit 1c869eb

Please sign in to comment.