Skip to content

Commit

Permalink
feat: add async.SingleSender and a new docker resource for mysql (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Feb 23, 2024
1 parent 1640424 commit 063ec10
Show file tree
Hide file tree
Showing 9 changed files with 433 additions and 7 deletions.
52 changes: 52 additions & 0 deletions async/single_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package async

import (
"context"
)

// SingleSender is a helper for sending and receiving values to and from a channel between 2 separate goroutines, a sending and a receiving goroutine, while at the same time supporting the following scenarios:
// 1. The sending goroutine in case the parent context is canceled should be able to notify the receiver goroutine about the error through the channel.
// 2. The receiving goroutine should be able to stop listening from the channel (a.k.a. leave) at any point.
// 3. The sending goroutine shouldn't be blocked trying to send to the channel when the receiver has left it.
// 4. Receiver's departure should act as a context cancellation signal to the sending goroutine, i.e. it should stop working.
type SingleSender[T any] struct {
ctx context.Context
ctxCancel context.CancelFunc
sendCtx context.Context
sendCtxCancel context.CancelFunc
ch chan T
closed bool
}

// Begin creates a new channel and returns it along with a context for the sending goroutine to use and a function for the receiving goroutine to be able to leave the "conversation" if needed.
func (s *SingleSender[T]) Begin(parentCtx context.Context) (ctx context.Context, ch <-chan T, leave func()) {
s.ctx, s.ctxCancel = context.WithCancel(parentCtx)
s.ch = make(chan T)
s.sendCtx, s.sendCtxCancel = context.WithCancel(context.Background())
return s.ctx, s.ch, s.sendCtxCancel
}

// Send tries to send a value to the channel. If the channel is closed, or the receiving goroutine has left it does nothing.
func (s *SingleSender[T]) Send(value T) {
closed := s.closed
if closed { // don't send to a closed channel
return
}
select {
case <-s.sendCtx.Done():
s.ctxCancel()
return
case s.ch <- value:
}
}

// Close the channel and cancel all related contexts.
func (s *SingleSender[T]) Close() {
if s.closed {
return
}
s.closed = true
s.ctxCancel()
s.sendCtxCancel()
close(s.ch)
}
222 changes: 222 additions & 0 deletions async/single_sender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package async_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/async"
)

func TestSingleSender(t *testing.T) {
type valueOrError struct {
value int
err error
}

send := func(ctx context.Context, s *async.SingleSender[valueOrError], times int) (sendCalls int) {
defer s.Close()
for i := 0; i < times; i++ {
if ctx.Err() != nil {
s.Send(valueOrError{err: ctx.Err()})
return
}
s.Send(valueOrError{value: i})
sendCalls++
}
return sendCalls
}

receive := func(ch <-chan valueOrError, delay time.Duration) ([]int, []error) {
var receivedValues []int
var receivedErrors []error
for v := range ch {
time.Sleep(delay)
if v.err != nil {
receivedErrors = append(receivedErrors, v.err)
} else {
receivedValues = append(receivedValues, v.value)
}
}
return receivedValues, receivedErrors
}

t.Run("receive all values from sender", func(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
s := &async.SingleSender[valueOrError]{}
ctx, ch, _ := s.Begin(context.Background())
defer s.Close()

g := &errgroup.Group{}

var sendCalls int
g.Go(func() error {
sendCalls = send(ctx, s, 10)
return nil
})

var receivedValues []int
var receivedErrors []error
g.Go(func() error {
receivedValues, receivedErrors = receive(ch, 0)
return nil
})

_ = g.Wait()

require.Equal(t, 10, sendCalls)
require.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, receivedValues)
require.Empty(t, receivedErrors)
})

t.Run("parent context is canceled", func(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
parentCtx, parentCtxCancel := context.WithCancel(context.Background())
parentCtxCancel()
s := &async.SingleSender[valueOrError]{}
ctx, ch, _ := s.Begin(parentCtx)
defer s.Close()

g := &errgroup.Group{}

var sendCalls int
g.Go(func() error {
sendCalls = send(ctx, s, 10)
return nil
})

var receivedValues []int
var receivedErrors []error

g.Go(func() error {
receivedValues, receivedErrors = receive(ch, 0)
return nil
})
_ = g.Wait()

require.Zero(t, sendCalls)
require.Nil(t, receivedValues, "no values should be received")
require.Equal(t, []error{context.Canceled}, receivedErrors)
})

t.Run("parent context is canceled after interaction has started", func(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
parentCtx, parentCtxCancel := context.WithCancel(context.Background())

s := &async.SingleSender[valueOrError]{}
ctx, ch, _ := s.Begin(parentCtx)
defer s.Close()

g := &errgroup.Group{}

var sendCalls int
g.Go(func() error {
sendCalls = send(ctx, s, 1000)
return nil
})

var receivedValues []int
var receivedErrors []error

g.Go(func() error {
receivedValues, receivedErrors = receive(ch, 10*time.Millisecond)
return nil
})
time.Sleep(time.Millisecond * 100)
parentCtxCancel()
_ = g.Wait()

require.GreaterOrEqual(t, sendCalls, 1, "sender should have called send at least for 1 value")
require.GreaterOrEqual(t, len(receivedValues), 1, "receiver should have called received at least for 1 value")
require.Equal(t, []error{context.Canceled}, receivedErrors)
})

t.Run("try to send another value after sender is closed", func(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
s := async.SingleSender[valueOrError]{}
_, ch, _ := s.Begin(context.Background())
defer s.Close()

g := &errgroup.Group{}

g.Go(func() error {
for i := 0; i < 10; i++ {
s.Send(valueOrError{value: i})
}
s.Close()
s.Send(valueOrError{value: 10})
return nil
})

var receivedValues []int
var receivedErrors []error

g.Go(func() error {
receivedValues, receivedErrors = receive(ch, 0)
return nil
})
_ = g.Wait()

require.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, receivedValues)
require.Empty(t, receivedErrors)
})

t.Run("receiver leaves before sender sends all values", func(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
s := async.SingleSender[valueOrError]{}
ctx, ch, leave := s.Begin(context.Background())
defer s.Close()

g := &errgroup.Group{}

var sendCalls int
g.Go(func() error {
sendCalls = send(ctx, &s, 10)
return nil
})

var receivedValues []int
var receivedErrors []error

g.Go(func() error {
for v := range ch {
if v.err != nil {
receivedErrors = append(receivedErrors, v.err)
} else {
receivedValues = append(receivedValues, v.value)
}
// leave after receiving 1 value
leave()
// make sure sender has time to try and send another value and figure out that context is canceled
time.Sleep(100 * time.Millisecond)
}
return nil
})
_ = g.Wait()

require.GreaterOrEqual(t, len(receivedValues), 1, "receiver should have received at least 1 value")
require.LessOrEqual(t, len(receivedValues), 2, "receiver should have received at most 2 values")
require.GreaterOrEqual(t, sendCalls, 1, "sender should have called send at least for 1 value")
require.LessOrEqual(t, sendCalls, 2, "sender should have called send at most for 2 values, i.e. it should stop sending after receiver leaves")
})

t.Run("sender closes then sends again", func(t *testing.T) {
s := async.SingleSender[valueOrError]{}
_, ch, _ := s.Begin(context.Background())
go func() {
s.Close()
s.Send(valueOrError{value: 1})
}()

var values []int
for range ch {
values = append(values, 1)
}

require.Empty(t, values)
})
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
go.opentelemetry.io/otel/sdk v1.23.1
go.opentelemetry.io/otel/sdk/metric v1.23.1
go.opentelemetry.io/otel/trace v1.23.1
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.19.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
Expand Down
20 changes: 20 additions & 0 deletions testhelper/docker/resource/mysql/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package mysql

type Opt func(*Config)

func WithTag(tag string) Opt {
return func(c *Config) {
c.Tag = tag
}
}

func WithShmSize(shmSize int64) Opt {
return func(c *Config) {
c.ShmSize = shmSize
}
}

type Config struct {
Tag string
ShmSize int64
}
88 changes: 88 additions & 0 deletions testhelper/docker/resource/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package mysql

import (
"bytes"
_ "encoding/json"
"fmt"

"github.com/ory/dockertest/v3"
dc "github.com/ory/dockertest/v3/docker"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
)

const (
defaultDB = "sources"
defaultUser = "root"
defaultPassword = "password"
)

type Resource struct {
DBDsn string
Database string
Password string
User string
Host string
Port string
}

func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(*Config)) (*Resource, error) {
c := &Config{
Tag: "8.2",
}
for _, opt := range opts {
opt(c)
}

// pulls an image, creates a container based on it and runs it
mysqlContainer, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "mysql",
Tag: c.Tag,
Env: []string{
"MYSQL_ROOT_PASSWORD=" + defaultPassword,
"MYSQL_DATABASE=" + defaultDB,
},
}, func(hc *dc.HostConfig) {
hc.ShmSize = c.ShmSize
})
if err != nil {
return nil, fmt.Errorf("running container: %w", err)
}

d.Cleanup(func() {
if err := pool.Purge(mysqlContainer); err != nil {
d.Log("Could not purge resource:", err)
}
})

dbDSN := fmt.Sprintf("%s:%s@tcp(127.0.0.1:%s)/%s?tls=false",
defaultUser, defaultPassword, mysqlContainer.GetPort("3306/tcp"), defaultDB,
)
// exponential backoff-retry, because the application in the container might not be ready to accept connections yet
err = pool.Retry(func() (err error) {
var w bytes.Buffer
code, err := mysqlContainer.Exec([]string{
"bash",
"-c",
"mysqladmin ping -h 127.0.0.1 --silent",
}, dockertest.ExecOptions{StdOut: &w, StdErr: &w})
if err != nil {
return err
}
if code != 0 {
return fmt.Errorf("mysql not ready:\n%s" + w.String())
}
return nil
})
if err != nil {
return nil, fmt.Errorf("pinging container: %w", err)
}
return &Resource{
DBDsn: dbDSN,
Database: defaultDB,
User: defaultUser,
Password: defaultPassword,
Host: "localhost",
Port: mysqlContainer.GetPort("3306/tcp"),
}, nil
}
Loading

0 comments on commit 063ec10

Please sign in to comment.