Skip to content

Commit

Permalink
Add guarantee checks to TestBatches test
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy authored and mperham committed Feb 6, 2024
1 parent e57b83b commit a5facbc
Showing 1 changed file with 117 additions and 7 deletions.
124 changes: 117 additions & 7 deletions test/ent/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@ package tester
import (
"os"
"testing"
"time"

"github.com/contribsys/faktory/client"
"github.com/stretchr/testify/assert"
)

// Since tests are run in parallel, better use test functions names (at least)
// as queue names for isolation
func localJob(cl *client.Client, jobtype string, q string) *client.Job {
var j = client.NewJob(jobtype, "what", "ever")
j.Queue = q
return j
}

func TestBatches(t *testing.T) {
var entFaktotyIntegrationTest = os.Getenv("FAKTORY_URL") != "" && os.Getenv("FAKTORY_ENT") == "1"
var entFaktotyIntegrationTest = os.Getenv("FAKTORY_URL") != "" && os.Getenv("FAKTORY_ENT") == "true"
if !entFaktotyIntegrationTest {
return
}
Expand All @@ -21,12 +30,113 @@ func TestBatches(t *testing.T) {
// create a batch
batch, createBatchError := cl.BatchNew(&client.Batch{
Description: "We will try to do some weird stuff with this batch and see what happens.",
Complete: localJob(cl, "OnParentComplete", "TestBatches_ParentCallbacksQueue"),
Success: localJob(cl, "OnParentSuccess", "TestBatches_ParentCallbacksQueue"),
})

// push a couple of jobs but do not commit just
assert.NoError(t, createBatchError, "Batch should be registered just fine by Ent (!?) Faktory.")
batch.Push(localJob(cl, "Common", "TestBatches"))
batch.Push(localJob(cl, "Common", "TestBatches"))

// let's try to open a batch we have never committed ...
openedBatch, openError := cl.BatchOpen(batch.Bid)
// .. no error from the server (somehting not mentioned in the docs, just checking)
assert.NoError(t, openError, "We can open a batch that we have not committed.")

// ok, let's commit the batch now
assert.NoError(t, batch.Commit(), "Batch committed (and now cannot be pushed into from outside).")

// From the docs:
// """Note that, once committed, only a job within the batch may reopen it.
// Faktory will return an error if you dynamically add jobs from "outside" the batch;
// this is to prevent a race condition between callbacks firing and an outsider adding more jobs."""
// Ref: https://github.com/contribsys/faktory/wiki/Ent-Batches#batch-open-bid

// let's try to open an already committed batch
reOpenedBatch, reOpenError := cl.BatchOpen(openedBatch.Bid)
assert.NoError(t, reOpenError, "We can re-open an already committed batch from outside.")

// let's push some extra jobs from outside and re-commit the batch
assert.NoError(t, reOpenedBatch.Push(localJob(cl, "Extra", "TestBatches")), "Extra job can be pushed from outside.")
assert.NoError(t, reOpenedBatch.Push(localJob(cl, "Extra", "TestBatches")), "Extra job can be pushed from outside.")
assert.NoError(t, reOpenedBatch.Commit(), "Batch can be re-committed from outside.")

// let's open this batch once again, add a nested batch, and commit
reOpenedAgainBatch, reOpenAgainError := cl.BatchOpen(reOpenedBatch.Bid)
assert.NoError(t, reOpenAgainError, "We can re-open the batch yet again.")
nestedBatch, createNestedBatchError := cl.BatchNew(&client.Batch{
Description: "We will try to do some weird stuff with this batch as well",
ParentBid: reOpenedAgainBatch.Bid,
Complete: localJob(cl, "OnNestedComplete", "TestBatches_CallbacksQueue"),
Success: localJob(cl, "OnNestedSuccess", "TestBatches_CallbacksQueue"),
})
assert.NoError(t, createNestedBatchError, "Can create a nested batch.")
assert.NoError(t, nestedBatch.Push(localJob(cl, "NestedJob", "TestBatches_Nested")), "Job can be pushed into nested batch from outside.")
assert.NoError(t, nestedBatch.Push(localJob(cl, "NestedJob", "TestBatches_Nested")), "Job can be pushed into nested batch from outside.")
assert.NoError(t, nestedBatch.Commit(), "Nested batch can be committed from outside.")
assert.NoError(t, reOpenedAgainBatch.Commit(), "Again committed 3rd time.")

// check nested batch status
batchStatus, getStatusError := cl.BatchStatus(nestedBatch.Bid)
assert.NoError(t, getStatusError, "Retrieved nested batch status just fine")
assert.Equal(t, int64(2), batchStatus.Total)
assert.Equal(t, int64(2), batchStatus.Pending)
assert.Equal(t, "", batchStatus.CompleteState) // pending
assert.Equal(t, "", batchStatus.SuccessState) // pending

// From the docs:
// """Once a callback has enqueued for a batch, you may not add anything to the batch."""
// ref: https://github.com/contribsys/faktory/wiki/Ent-Batches#guarantees

// Let's consume the jobs from the out nested batch and make Faktory fire those
// "OnNestedComplete" callback and "OnNestedSuccess" callback

// job 1
j, fetchErr := cl.Fetch("TestBatches_Nested")
assert.NoError(t, fetchErr, "fetched ok")
assert.Equal(t, "NestedJob", j.Type, "expected a job of type 'NestedJob'")
cl.Ack(j.Jid)

// job 2
j, fetchErr = cl.Fetch("TestBatches_Nested")
assert.NoError(t, fetchErr, "fetched ok")
assert.Equal(t, "NestedJob", j.Type, "expected a job of type 'NestedJob'")
cl.Ack(j.Jid)

// let Faktory enqueue the callbacks
time.Sleep(time.Duration(2) * time.Second)

// check nested batch status again
batchStatus, getStatusError = cl.BatchStatus(nestedBatch.Bid)
assert.NoError(t, getStatusError, "Retrieved nested batch status just fine")
assert.Equal(t, int64(2), batchStatus.Total)
assert.Equal(t, int64(0), batchStatus.Pending) // job 1 and job 2 consumed!
assert.Equal(t, int64(0), batchStatus.Failed)
assert.Equal(t, "1", batchStatus.CompleteState) // enqueued
assert.Equal(t, "", batchStatus.SuccessState) // still pending

// let's consume the "complete" callback ...
j, fetchErr = cl.Fetch("TestBatches_CallbacksQueue")
assert.NoError(t, fetchErr, "fetched ok")
assert.Equal(t, "OnNestedComplete", j.Type, "expected a job of type 'OnNestedComplete'")
cl.Ack(j.Jid)

// ... and check the status again
batchStatus, getStatusError = cl.BatchStatus(nestedBatch.Bid)
assert.NoError(t, getStatusError, "Retrieved nested batch status just fine")
assert.Equal(t, int64(2), batchStatus.Total)
assert.Equal(t, int64(0), batchStatus.Pending)
assert.Equal(t, int64(0), batchStatus.Failed)
assert.Equal(t, "2", batchStatus.CompleteState) // successfully completed
assert.Equal(t, "1", batchStatus.SuccessState) // enqueued

// push a couple of jobs and commit
assert.NoError(t, createBatchError, "Batch should be registered just fine by Ent (!?) Faktory")
batch.Push(client.NewJob("Common"))
batch.Push(client.NewJob("Common"))
commitBatchError := batch.Commit()
assert.NoError(t, commitBatchError, "Batch committed (and now cannot be pushed into from outside)")
// Final touch:
// Now with the callbacks enqueued, let's re-open the nested batch, and
// try to push some more jobs and commit
reOpenedNestedBatch, nestedReOpenError := cl.BatchOpen(batchStatus.Bid)
assert.NoError(t, nestedReOpenError, "We can re-open a batch for which the callbacks have already fired.")
assert.NoError(t, reOpenedNestedBatch.Push(localJob(cl, "DoesNotMatter", "TestBatches_Nested_2")), "Still can push.")
assert.NoError(t, reOpenedNestedBatch.Push(localJob(cl, "DoesNotMatter", "TestBatches_Nested_2")), "Still can push")
assert.NoError(t, reOpenedNestedBatch.Commit(), "And can also commit without error")
}

0 comments on commit a5facbc

Please sign in to comment.