Skip to content

Commit

Permalink
GetNextJobId() and AddJob() are working for Mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
ezavada committed Aug 17, 2024
1 parent bf73f65 commit 6c2d5df
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 21 deletions.
8 changes: 4 additions & 4 deletions jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ func New[T any](
opt(jq)
}

ctx, cancel := context.WithCancel(context.Background())
jq.cancel = cancel

// Open JobQueue DB
var db JobQueueDb[T]
if jq.dbUseMongo {
dbPath = jq.dbPath // this will have been set by the options
db = NewJobQueueDbMongo[T]()
db = NewJobQueueDbMongo[T](ctx)
} else {
db = NewJobQueueDbBadger[T](jq.dbInMemory)
}
Expand All @@ -132,9 +135,6 @@ func New[T any](

jq.logger.Info().Msg("Starting job queue")

ctx, cancel := context.WithCancel(context.Background())
jq.cancel = cancel

// Load jobs from JobQueue DB
go jq.pollJobs(ctx)

Expand Down
66 changes: 52 additions & 14 deletions jobqueue_db_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,32 @@ package jobqueue

import (
"context"
"errors"
"fmt"

// "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

// JobQueueDbMongo is the MongoDB implementation of the JobQueueDb interface
type JobQueueDbMongo[T any] struct {
client *mongo.Client
ctx context.Context
db *mongo.Database
coll *mongo.Collection
client *mongo.Client
ctx context.Context
db *mongo.Database
coll *mongo.Collection
idColl *mongo.Collection
jobQueueName string
}

// NewJobQueueDbMongo creates a new JobQueueDbMongo instance
func NewJobQueueDbMongo[T any]() JobQueueDb[T] {
func NewJobQueueDbMongo[T any](ctx context.Context) JobQueueDb[T] {
return &JobQueueDbMongo[T]{
client: nil,
ctx: context.TODO(),
db: nil,
coll: nil,
client: nil,
ctx: ctx,
db: nil,
coll: nil,
jobQueueName: "",
}
}

Expand All @@ -39,9 +43,16 @@ func (jqdb *JobQueueDbMongo[T]) Open(path string, queueName string) error {
if jqdb.db == nil {
return fmt.Errorf("failed to open mongo database job_queues")
}
jqdb.coll = jqdb.db.Collection(queueName + "_jobs")
// holds the jobs for the queue
jqdb.jobQueueName = queueName + "_jobs"
jqdb.coll = jqdb.db.Collection(jqdb.jobQueueName)
if jqdb.coll == nil {
return fmt.Errorf("failed to open collection job_queues.%s_jobs", queueName)
return fmt.Errorf("failed to open collection job_queues.%s", jqdb.jobQueueName)
}
// holds the job IDs for all queues
jqdb.idColl = jqdb.db.Collection("job_ids")
if jqdb.idColl == nil {
return fmt.Errorf("failed to open collection job_queues.job_ids")
}
return nil
}
Expand All @@ -54,7 +65,30 @@ func (jqdb *JobQueueDbMongo[T]) Close() error {

// GetNextJobId() (uint64, error)
func (jqdb *JobQueueDbMongo[T]) GetNextJobId() (uint64, error) {
return 0, nil

var nextJobId uint64
result := jqdb.idColl.FindOneAndUpdate(jqdb.ctx,
bson.D{{Key: "queue", Value: jqdb.jobQueueName}}, // selector
bson.D{{Key: "$inc", Value: bson.D{{Key: "next_job_id", Value: 1}}}}) // update, increment next_job_id by 1, return old record
if result.Err() != nil {
if errors.Is(result.Err(), mongo.ErrNoDocuments) {
// insert the queue if it doesn't exist. We start with ID 2 because we are returning 1
_, err := jqdb.idColl.InsertOne(jqdb.ctx, bson.D{{Key: "queue", Value: jqdb.jobQueueName}, {Key: "next_job_id", Value: 2}})
if err != nil {
return 0, fmt.Errorf("failed to create initial mongo record for next job id: %w", err)
}
nextJobId = 1
} else {
return 0, fmt.Errorf("failed to get next job id: %w", result.Err())
}
}
raw, err := result.Raw()
if err != nil {
return 0, fmt.Errorf("failed to get raw result from mongo: %w", err)
}
val := raw.Lookup("next_job_id")
nextJobId = uint64(val.AsInt64())
return nextJobId, nil
}

// FetchJobs(count int) ([]*job[T], error)
Expand All @@ -74,7 +108,11 @@ func (jqdb *JobQueueDbMongo[T]) UpdateJob(job *job[T]) error {

// AddJob(job *job[T]) (uint64, error) // returns the job ID
func (jqdb *JobQueueDbMongo[T]) AddJob(job *job[T]) (uint64, error) {
return 0, nil
_, err := jqdb.coll.InsertOne(jqdb.ctx, job)
if err != nil {
return 0, fmt.Errorf("failed to insert job into mongo collection: %w", err)
}
return job.ID, nil
}

// DeleteJob(jobID uint64) error
Expand Down
6 changes: 3 additions & 3 deletions jobqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ func TestJobConcurrency(t *testing.T) {
// create initial job queue

// mongo version
//jq, err := New[testJob]("ignored", "test-job", 5, complexJobHandler(),
// UseMongoDB[testJob]("mongodb://localhost:27017"))
jq, err := New[testJob]("ignored", "test-job", 5, complexJobHandler(),
UseMongoDB[testJob]("mongodb://localhost:27017"))

// badger version
jq, err := New[testJob]("/tmp/badger", "test-job", 5, complexJobHandler())
//jq, err := New[testJob]("/tmp/badger", "test-job", 5, complexJobHandler())

assert.NoError(t, err)
t.Cleanup(func() {
Expand Down

0 comments on commit 6c2d5df

Please sign in to comment.