diff --git a/gateway/webhook/integration_test.go b/gateway/webhook/integration_test.go index 6a3ae97c4c..14cec08a29 100644 --- a/gateway/webhook/integration_test.go +++ b/gateway/webhook/integration_test.go @@ -89,6 +89,7 @@ func TestIntegrationWebhook(t *testing.T) { "gateway", jobsdb.WithDBHandle(p.DB), ) + require.NoError(t, gatewayDB.Start()) defer gatewayDB.TearDown() @@ -123,6 +124,7 @@ func TestIntegrationWebhook(t *testing.T) { bcs := make(map[string]backendconfig.ConfigT) testSetup := testcases.Load(t) + sourceConfigs := make([]backendconfig.SourceT, len(testSetup.Cases)) for i, tc := range testSetup.Cases { @@ -284,15 +286,19 @@ func TestIntegrationWebhook(t *testing.T) { assert.JSONEq(t, string(p), string(batch.Batch[0])) } - r, err = errDB.GetUnprocessed(ctx, jobsdb.GetQueryParams{ - WorkspaceID: workspaceID, - // ParameterFilters: []jobsdb.ParameterFilterT{{ - // Name: "source_id", - // Value: sourceID, - // }}, - JobsLimit: 1, - }) - require.NoError(t, err) + require.Eventually(t, func() bool { + r, err = errDB.GetUnprocessed(ctx, jobsdb.GetQueryParams{ + WorkspaceID: workspaceID, + ParameterFilters: []jobsdb.ParameterFilterT{{ + Name: "source_id", + Value: sourceID, + }}, + JobsLimit: 10, + }) + require.NoError(t, err) + return len(r.Jobs) == len(tc.Output.ErrQueue) + }, time.Second, time.Millisecond*10) + assert.Len(t, r.Jobs, len(tc.Output.ErrQueue)) for i, p := range tc.Output.ErrQueue { errPayload, err := json.Marshal(struct {