Skip to content

theopenlane/riverboat

Go Report Card Build status Go Reference License: Apache 2.0 Quality Gate Status

Riverboat

Riverboat is the job queue used in openlane based on the riverqueue project.

Usage

Jobs can be inserted into the job queue either from this server directly, or from any codebase with an Insert Only river client. All jobs will be processed via the riverboat server. Since jobs are committed to the database within a transaction, and stored in the database we do not have to worry about dropped events.

Getting Started

This repo includes several Taskfiles to assist with getting things running.

Dependencies

  • Go 1.23+
  • Docker (used for running Postgres and the river-ui)
  • task

Starting the Server

The following will start up postgres, the river-ui, and the riverboat server:

task run-dev

Test Jobs

Included in the test/ directory are test jobs corresponding to the job types in pkg/jobs.

  1. Start the riverboat server using task run-dev

  2. Run the test main, for example the email:

    go run test/email/main.go
  3. This should insert the job successfully, it should be processed by river and the email should be added to fixtures/email

Adding New Jobs

  1. New jobs should be added to the pkg/jobs directory in a new file, refer to the upstream docs for implementation details. The following is a stem job that could be copied to get you started.

    package jobs
    
    import (
       "context"
    
       "github.com/riverqueue/river"
       "github.com/rs/zerolog/log"
    )
    
    // ExampleArgs for the example worker to process the job
    type ExampleArgs struct {
       // ExampleArg is an example argument
       ExampleArg string `json:"example_arg"`
    }
    
    // Kind satisfies the river.Job interface
    func (ExampleArgs) Kind() string { return "example" }
    
    // ExampleWorker does all sorts of neat stuff
    type ExampleWorker struct {
       river.WorkerDefaults[ExampleArgs]
    
       ExampleConfig
    }
    
    // ExampleConfig contains the configuration for the example worker
    type ExampleConfig struct {
       // DevMode is a flag to enable dev mode so we don't actually send millions of carrier pigeons
       DevMode bool `koanf:"devMode" json:"devMode" jsonschema:"description=enable dev mode" default:"true"`
    }
    
    // Work satisfies the river.Worker interface for the example worker
    func (w *ExampleConfig) Work(ctx context.Context, job *river.Job[ExampleArgs]) error {
       // do some work
    
       return nil
    }
  2. Add a test for the new job, see email_test.go as an example. There are additional helper functions that can be used, see river test helpers for details.

  3. If there are configuration settings, add the worker to pkg/river/config.go Workers struct, this will allow the config variables to be set via the koanf config setup. Once added you will need to regenerate the config:

    task config:generate
  4. Register the worker by adding the river.AddWorkerSafely to the pkg/river/workers.go createWorkers function.

  5. Add a test job to test/ directory by creating a new directory with a main.go function that will insert the job into the queue.

Contributing

See the contributing guide for more information.