Skip to content

Commit

Permalink
Prepare Faktory 2.x for json.Number, see #395
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Apr 23, 2024
1 parent d004a7f commit 4ec9987
Show file tree
Hide file tree
Showing 20 changed files with 201 additions and 126 deletions.
4 changes: 3 additions & 1 deletion client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package client
import (
"encoding/json"
"fmt"

"github.com/contribsys/faktory/util"
)

type BatchStatus struct {
Expand Down Expand Up @@ -176,7 +178,7 @@ func (c *Client) BatchStatus(bid string) (*BatchStatus, error) {
}

var stat BatchStatus
err = json.Unmarshal(data, &stat)
err = util.JsonUnmarshal(data, &stat)
if err != nil {
return nil, err
}
Expand Down
85 changes: 44 additions & 41 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/contribsys/faktory/internal/pool"
"github.com/contribsys/faktory/util"
)

const (
Expand Down Expand Up @@ -152,7 +153,7 @@ func DefaultServer() *Server {
//
// Use the URL to configure any necessary password:
//
// tcp://:mypassword@localhost:7419
// tcp://:mypassword@localhost:7419
//
// By default Open assumes localhost with no password
// which is appropriate for local development.
Expand Down Expand Up @@ -182,8 +183,7 @@ func OpenWithDialer(dialer Dialer) (*Client, error) {
// a *tls.Dialer if "tcp+tls" and a *net.Dialer if
// not.
//
// client.Dial(client.Localhost, "topsecret")
//
// client.Dial(client.Localhost, "topsecret")
func Dial(srv *Server, password string) (*Client, error) {
d := &net.Dialer{Timeout: srv.Timeout}
dialer := Dialer(d)
Expand All @@ -198,6 +198,12 @@ func DialWithDialer(srv *Server, password string, dialer Dialer) (*Client, error
return dial(srv, password, dialer)
}

type HIv2 struct {
V int `json:"v"` // version, should be 2
I int `json:"i,omitempty"` // iterations
S string `json:"s,omitempty"` // salt
}

// dial connects to the remote faktory server.
func dial(srv *Server, password string, dialer Dialer) (*Client, error) {
client := emptyClientData()
Expand Down Expand Up @@ -227,27 +233,19 @@ func dial(srv *Server, password string, dialer Dialer) (*Client, error) {
if strings.HasPrefix(line, "HI ") {
str := strings.TrimSpace(line)[3:]

var hi map[string]interface{}
err = json.Unmarshal([]byte(str), &hi)
var hi HIv2
err = util.JsonUnmarshal([]byte(str), &hi)
if err != nil {
conn.Close()
return nil, err
}
v, ok := hi["v"].(float64)
if ok {
if ExpectedProtocolVersion != int(v) {
fmt.Println("Warning: server and client protocol versions out of sync:", v, ExpectedProtocolVersion)
}
if ExpectedProtocolVersion != hi.V {
util.Infof("Warning: server and client protocol versions out of sync: want %d, got %d", ExpectedProtocolVersion, hi.V)
}

salt, ok := hi["s"].(string)
if ok {
iter := 1
iterVal, ok := hi["i"]
if ok {
iter = int(iterVal.(float64))
}

salt := hi.S
if salt != "" {
iter := hi.I
client.PasswordHash = hash(password, salt, iter)
}
} else {
Expand Down Expand Up @@ -303,7 +301,7 @@ func (c *Client) PushBulk(jobs []*Job) (map[string]string, error) {
return nil, err
}
results := map[string]string{}
err = json.Unmarshal(data, &results)
err = util.JsonUnmarshal(data, &results)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -342,7 +340,7 @@ func (c *Client) Fetch(q ...string) (*Job, error) {
}

var job Job
err = json.Unmarshal(data, &job)
err = util.JsonUnmarshal(data, &job)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -421,7 +419,11 @@ func (c *Client) ResumeQueues(names ...string) error {
return c.ok(c.rdr)
}

// deprecated, this returns an untyped map.
// use CurrentState() instead which provides strong typing
func (c *Client) Info() (map[string]interface{}, error) {
util.Info("client.Info() is deprecated, use client.CurrentState() instead")

err := c.writeLine(c.wtr, "INFO", nil)
if err != nil {
return nil, err
Expand All @@ -435,42 +437,43 @@ func (c *Client) Info() (map[string]interface{}, error) {
return nil, nil
}

var hash map[string]interface{}
err = json.Unmarshal(data, &hash)
var cur map[string]interface{}
err = util.JsonUnmarshal(data, &cur)
if err != nil {
return nil, err
}

return hash, nil
return cur, nil
}

func (c *Client) QueueSizes() (map[string]uint64, error) {
hash, err := c.Info()
func (c *Client) CurrentState() (*FaktoryState, error) {
err := c.writeLine(c.wtr, "INFO", nil)
if err != nil {
return nil, err
}

faktory, ok := hash["faktory"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid info hash: %s", hash)
data, err := c.readResponse(c.rdr)
if err != nil {
return nil, err
}

queues, ok := faktory["queues"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid info hash: %s", hash)
if len(data) == 0 {
return nil, nil
}

sizes := make(map[string]uint64)
for name, size := range queues {
size, ok := size.(float64)
if !ok {
return nil, fmt.Errorf("invalid queue size: %v", size)
}

sizes[name] = uint64(size)
var cur FaktoryState
err = util.JsonUnmarshal(data, &cur)
if err != nil {
return nil, err
}
return &cur, nil
}

return sizes, nil
func (c *Client) QueueSizes() (map[string]uint64, error) {
state, err := c.CurrentState()
if err != nil {
return nil, err
}
return state.Data.Queues, nil
}

func (c *Client) Generic(cmdline string) (string, error) {
Expand Down
6 changes: 3 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ func TestClientOperations(t *testing.T) {
resp <- "$36\r\n{\"faktory\":{\"queues\":{\"default\":2}}}\r\n"
sizes, err := cl.QueueSizes()
assert.NoError(t, err)
assert.Equal(t, sizes["default"], uint64(2))
assert.EqualValues(t, 2, sizes["default"])
assert.Contains(t, <-req, "INFO")

resp <- "$39\r\n{\"faktory\":{\"queues\":{\"invalid\":null}}}\r\n"
sizes, err = cl.QueueSizes()
assert.Error(t, err)
assert.Nil(t, sizes)
assert.NoError(t, err)
assert.EqualValues(t, 0, sizes["invalid"])
assert.Contains(t, <-req, "INFO")

err = cl.Close()
Expand Down
27 changes: 27 additions & 0 deletions client/faktory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,30 @@ var (
Name = "Faktory"
Version = "1.9.0"
)

// Structs for parsing the INFO response
type FaktoryState struct {
Now string `json:"now"`
ServerUtcTime string `json:"server_utc_time"`
Data DataSnapshot `json:"faktory"`
Server ServerSnapshot `json:"server"`
}

type DataSnapshot struct {
TotalFailures uint64 `json:"total_failures"`
TotalProcessed uint64 `json:"total_processed"`
TotalEnqueued uint64 `json:"total_enqueued"`
TotalQueues uint64 `json:"total_queues"`
Queues map[string]uint64 `json:"queues"`
Sets map[string]uint64 `json:"sets"`
Tasks map[string]map[string]interface{} `json:"tasks"` // deprecated
}

type ServerSnapshot struct {
Description string `json:"description"`
Version string `json:"faktory_version"`
Uptime uint64 `json:"uptime"`
Connections uint64 `json:"connections"`
CommandCount uint64 `json:"command_count"`
UsedMemoryMB uint64 `json:"used_memory_mb"`
}
2 changes: 1 addition & 1 deletion client/tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Client) TrackGet(jid string) (*JobTrack, error) {
}

var trck JobTrack
err = json.Unmarshal(data, &trck)
err = util.JsonUnmarshal(data, &trck)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions manager/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package manager

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -158,7 +157,7 @@ func (el *simpleLease) Job() (*client.Job, error) {
}
if el.job == nil {
var job client.Job
err := json.Unmarshal(el.payload, &job)
err := util.JsonUnmarshal(el.payload, &job)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal job payload: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions manager/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package manager

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -38,7 +37,7 @@ func (m *manager) schedule(ctx context.Context, when time.Time, set storage.Sort
for {
count, err := set.RemoveBefore(ctx, util.Thens(when), 100, func(data []byte) error {
var job client.Job
if err := json.Unmarshal(data, &job); err != nil {
if err := util.JsonUnmarshal(data, &job); err != nil {
return fmt.Errorf("cannot unmarshal job payload: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions manager/working.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *manager) loadWorkingSet(ctx context.Context) error {
addedCount := 0
err := m.store.Working().Each(ctx, func(idx int, entry storage.SortedEntry) error {
var res Reservation
err := json.Unmarshal(entry.Value(), &res)
err := util.JsonUnmarshal(entry.Value(), &res)
if err != nil {
// We can't return an error here, this method is best effort
// as we are booting the server. We can't allow corrupted data
Expand Down Expand Up @@ -193,7 +193,7 @@ func (m *manager) ReapExpiredJobs(ctx context.Context, when time.Time) (int64, e
tm := util.Thens(when)
count, err := m.store.Working().RemoveBefore(ctx, tm, 10, func(data []byte) error {
var res Reservation
err := json.Unmarshal(data, &res)
err := util.JsonUnmarshal(data, &res)
if err != nil {
return fmt.Errorf("cannot unmarshal reservation payload: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions server/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func pushBulk(c *Connection, s *Server, cmd string) {
data := cmd[6:]
jobs := make([]client.Job, 0)

err := json.Unmarshal([]byte(data), &jobs)
err := util.JsonUnmarshal([]byte(data), &jobs)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid JSON: %w", err))
return
Expand Down Expand Up @@ -147,7 +147,7 @@ func push(c *Connection, s *Server, cmd string) {
var job client.Job
job.Retry = &client.RetryPolicyDefault

err := json.Unmarshal([]byte(data), &job)
err := util.JsonUnmarshal([]byte(data), &job)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid JSON: %w", err))
return
Expand Down Expand Up @@ -201,7 +201,7 @@ func ack(c *Connection, s *Server, cmd string) {
data := cmd[4:]

var hash map[string]string
err := json.Unmarshal([]byte(data), &hash)
err := util.JsonUnmarshal([]byte(data), &hash)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid ACK %s", data))
return
Expand All @@ -225,7 +225,7 @@ func fail(c *Connection, s *Server, cmd string) {
data := cmd[5:]

var failure manager.FailPayload
err := json.Unmarshal([]byte(data), &failure)
err := util.JsonUnmarshal([]byte(data), &failure)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid FAIL %s", data))
return
Expand Down Expand Up @@ -266,7 +266,7 @@ func heartbeat(c *Connection, s *Server, cmd string) {
data := cmd[5:]

var beat ClientBeat
err := json.Unmarshal([]byte(data), &beat)
err := util.JsonUnmarshal([]byte(data), &beat)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid BEAT %s", data))
return
Expand Down
4 changes: 2 additions & 2 deletions server/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package server

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/contribsys/faktory/client"
"github.com/contribsys/faktory/manager"
"github.com/contribsys/faktory/storage"
"github.com/contribsys/faktory/util"
)

var (
Expand Down Expand Up @@ -119,7 +119,7 @@ func mutate(c *Connection, s *Server, cmd string) {

var err error
var op client.Operation
err = json.Unmarshal([]byte(parts[1]), &op)
err = util.JsonUnmarshal([]byte(parts[1]), &op)
if err != nil {
_ = c.Error(cmd, err)
return
Expand Down
Loading

0 comments on commit 4ec9987

Please sign in to comment.