Skip to content

Commit

Permalink
Encapsulated and mocked the external dependency
Browse files Browse the repository at this point in the history
Reason: Pave the way for unit tests
  • Loading branch information
alok87 committed Jun 15, 2020
1 parent 3b73913 commit 79434d6
Show file tree
Hide file tree
Showing 13 changed files with 1,496 additions and 54 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.14
require (
github.com/aws/aws-sdk-go v1.29.15
github.com/beanstalkd/go-beanstalk v0.0.0-20200526060843-1cc502ecaf3c
github.com/golang/mock v1.4.3
github.com/mitchellh/go-homedir v1.1.0
github.com/prometheus/client_golang v0.9.3
github.com/spf13/cobra v0.0.5
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.4.3 h1:GV+pQPG/EUUbkh47niozDcADz6go/dUwhVzdUQHIVRw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -312,6 +315,10 @@ modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k=
modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/xc v1.0.0/go.mod h1:mRNCo0bvLjGhHO9WsyuKVU4q0ceiDDDoEeWDJHrNx8I=
rsc.io/quote/v3 v3.1.0 h1:9JKUTTIUgS6kzR9mK1YuGKv6Nl+DijDNIc0ghT58FaY=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0 h1:7uVkIFmeBqHfdjD+gZwtXXI+RODJ2Wc4O7MPEh/QiW4=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/structured-merge-diff v0.0.0-20190302045857-e85c7b244fd2/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
145 changes: 92 additions & 53 deletions pkg/queue/beanstalk.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queue

import (
"errors"
"net/url"
"path"
"strconv"
Expand All @@ -14,8 +15,8 @@ import (
// Beanstalk is used to by the Poller to get the queue
// information from Beanstalk, it implements the QueuingService interface
type Beanstalk struct {
queues *Queues
connPool *sync.Map
queues *Queues
clientPool *sync.Map

shortPollInterval time.Duration
longPollInterval int64
Expand All @@ -27,36 +28,41 @@ func NewBeanstalk(
longPollInterval int) (QueuingService, error) {

return &Beanstalk{
queues: queues,
connPool: new(sync.Map),
queues: queues,
clientPool: new(sync.Map),

shortPollInterval: time.Second * time.Duration(shortPollInterval),
longPollInterval: int64(longPollInterval),
}, nil
}

func MustParseInt(s string, base int, bitSize int) int32 {
func mustParseInt(s string, base int, bitSize int) int32 {
i, err := strconv.ParseInt(s, base, bitSize)
if err != nil {
klog.Fatalf("Error parsing int: %v", err)
}
return int32(i)
}

func MustParseUint(s string, base int, bitSize int) uint32 {
func mustParseUint(s string, base int, bitSize int) uint32 {
i, err := strconv.ParseUint(s, base, bitSize)
if err != nil {
klog.Fatalf("Error parsing int: %v", err)
}
return uint32(i)
}

func (b *Beanstalk) getConnection(queueURI string) (*beanstalk.Conn, error) {
conn, _ := b.connPool.Load(queueURI)
if conn != nil {
return conn.(*beanstalk.Conn), nil
}
type BeanstalkClientInterface interface {
getStats() (int32, int32, int32, error)
longPollReceiveMessage(longPollInterval int64) (int32, int32, error)
}

type beanstalkClient struct {
conn *beanstalk.Conn
queueURI string
}

func NewBeanstalkClient(queueURI string) (BeanstalkClientInterface, error) {
var host, port string
parsedURI, err := url.Parse(queueURI)
if err != nil {
Expand All @@ -69,87 +75,93 @@ func (b *Beanstalk) getConnection(queueURI string) (*beanstalk.Conn, error) {
port = "11300"
}

conn, err = beanstalk.Dial("tcp", host+":"+port)
if err != nil {
return nil, err
}

b.connPool.Store(queueURI, conn)

return conn.(*beanstalk.Conn), nil
}

func (b *Beanstalk) getTube(queueURI string) (*beanstalk.Tube, error) {
conn, err := b.getConnection(queueURI)
conn, err := beanstalk.Dial("tcp", host+":"+port)
if err != nil {
return nil, err
return nil, errors.New("beanstalk dial error: " + err.Error())
}

return &beanstalk.Tube{Conn: conn, Name: path.Base(queueURI)}, nil
return &beanstalkClient{conn: conn, queueURI: queueURI}, nil
}

func (b *Beanstalk) getStats(queueURI string) (int32, int32, int32, error) {
tube, err := b.getTube(queueURI)
if err != nil {
return 0, 0, 0, err
}
func (c *beanstalkClient) getStats() (int32, int32, int32, error) {
tube := &beanstalk.Tube{Conn: c.conn, Name: path.Base(c.queueURI)}

output, err := tube.Stats()
if err != nil {
return 0, 0, 0, err
return 0, 0, 0, errors.New("beanstalk get-stats error: " + err.Error())
}

jobsWaiting := MustParseInt(output["current-jobs-ready"], 10, 32)
idleWorkers := MustParseInt(output["current-waiting"], 10, 32)
jobsReserved := MustParseInt(output["current-jobs-reserved"], 10, 32)
jobsWaiting := mustParseInt(output["current-jobs-ready"], 10, 32)
idleWorkers := mustParseInt(output["current-waiting"], 10, 32)
jobsReserved := mustParseInt(output["current-jobs-reserved"], 10, 32)
return jobsWaiting, idleWorkers, jobsReserved, nil
}

func (b *Beanstalk) longPollReceiveMessage(queueURI string) (int32, int32, error) {
conn, err := b.getConnection(queueURI)
if err != nil {
return 0, 0, err
}
func (c *beanstalkClient) longPollReceiveMessage(
longPollInterval int64) (int32, int32, error) {

tubeSet := beanstalk.NewTubeSet(conn, path.Base(queueURI))
tubeSet := beanstalk.NewTubeSet(c.conn, path.Base(c.queueURI))
id, _, err := tubeSet.Reserve(
time.Duration(b.longPollInterval) * time.Second,
time.Duration(longPollInterval) * time.Second,
)
e, ok := err.(beanstalk.ConnError)
if ok && e.Err == beanstalk.ErrTimeout {
return 0, 0, nil
}
if err != nil {
return 0, 0, err
return 0, 0, errors.New("beanstalk tube-reserve error: " + err.Error())
}

statsJob, err := conn.StatsJob(id)
statsJob, err := c.conn.StatsJob(id)
if err != nil {
return 0, 0, err
return 0, 0, errors.New("beanstalk stats-job error: " + err.Error())
}

conn.Release(id, MustParseUint(statsJob["pri"], 10, 32), 0)
c.conn.Release(id, mustParseUint(statsJob["pri"], 10, 32), 0)

return 1, 0, nil
}

// TODO: need to get this data from some source
// like: Prometheus: https://github.com/practo/beanstalkd_exporter
func (b *Beanstalk) getAverageNumberOfMessagesSent(queueURI string) (float64, error) {
return 0.0, nil
func (b *Beanstalk) getClient(
queueURI string) (BeanstalkClientInterface, error) {

client, _ := b.clientPool.Load(queueURI)
if client != nil {
return client.(BeanstalkClientInterface), nil
}

client, err := NewBeanstalkClient(queueURI)
if err != nil {
return nil, err
}
b.clientPool.Store(queueURI, client)

return client.(BeanstalkClientInterface), nil
}

func (b *Beanstalk) getApproxMessages(queueURI string) (int32, error) {
jobsWaiting, _, _, err := b.getStats(queueURI)
client, err := b.getClient(queueURI)
if err != nil {
return 0, err
}

jobsWaiting, _, _, err := client.getStats()
if err != nil {
return jobsWaiting, err
}

return jobsWaiting, nil
}

func (b *Beanstalk) getApproxMessagesNotVisible(queueURI string) (int32, error) {
_, _, jobsReserved, err := b.getStats(queueURI)
func (b *Beanstalk) getApproxMessagesNotVisible(
queueURI string) (int32, error) {

client, err := b.getClient(queueURI)
if err != nil {
return 0, err
}

_, _, jobsReserved, err := client.getStats()
if err != nil {
return jobsReserved, err
}
Expand All @@ -158,14 +170,41 @@ func (b *Beanstalk) getApproxMessagesNotVisible(queueURI string) (int32, error)
}

func (b *Beanstalk) getIdleWorkers(queueURI string) (int32, error) {
_, idleWorkers, _, err := b.getStats(queueURI)
client, err := b.getClient(queueURI)
if err != nil {
return 0, err
}

_, idleWorkers, _, err := client.getStats()
if err != nil {
return idleWorkers, err
}

return idleWorkers, nil
}

// TODO: need to get this data from some source
// like: Prometheus: https://github.com/practo/beanstalkd_exporter
func (b *Beanstalk) getAverageNumberOfMessagesSent(
queueURI string) (float64, error) {

return 0.0, nil
}

func (b *Beanstalk) longPollReceiveMessage(
queueURI string) (int32, int32, error) {

client, err := b.getClient(queueURI)
if err != nil {
return 0, 0, err
}

messages, idleWorkers, err := client.longPollReceiveMessage(
b.longPollInterval)

return messages, idleWorkers, err
}

func (b *Beanstalk) Sync(stopCh <-chan struct{}) {
for {
select {
Expand Down
66 changes: 66 additions & 0 deletions pkg/queue/beanstalk_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions pkg/queue/beanstalk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package queue

import (
_ "github.com/golang/mock/gomock"
"testing"
)

const (
queueNameOne = "q1"
)

func newTestQueues() *Queues {
q := NewQueues()

q.Add(
"namespace1",
"queuename1",
"beanstalk://beanstalkd.namespace1:11300/"+queueNameOne,
)
return q
}
12 changes: 12 additions & 0 deletions vendor/github.com/golang/mock/AUTHORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 79434d6

Please sign in to comment.