From 0d255fc217973fca297643e7cb98a3464a7439dd Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Thu, 5 Dec 2024 21:10:40 +1100 Subject: [PATCH 1/3] add additional conditions for extendedContext Signed-off-by: NikitaSkrynnik --- extendtimeout/connection.go | 27 +++++++++++- extendtimeout/connection_test.go | 76 +++++++++++++++++++++++++++----- 2 files changed, 89 insertions(+), 14 deletions(-) diff --git a/extendtimeout/connection.go b/extendtimeout/connection.go index 288a872..6b9d1c5 100644 --- a/extendtimeout/connection.go +++ b/extendtimeout/connection.go @@ -54,15 +54,38 @@ func (c *extendedConnection) Invoke(ctx context.Context, req, reply api.Message) return err } +func (c *extendedConnection) cancelMonitorCtx(ctx context.Context) (cancelMonitorCtx context.Context, cancel func()) { + cancelCtx, cancelFunc := context.WithCancel(context.Background()) + cancelCh := make(chan struct{}) + go func() { + select { + case <-time.After(c.contextTimeout): + <-ctx.Done() + case <-cancelCh: + } + cancelFunc() + }() + + cancelMonitorCtx = &extendedContext{ + Context: cancelCtx, + valuesContext: ctx, + } + cancel = func() { + cancelCh <- struct{}{} + close(cancelCh) + } + return +} + func (c *extendedConnection) withExtendedTimeoutCtx(ctx context.Context) (extendedCtx context.Context, cancel func()) { deadline, ok := ctx.Deadline() if !ok { - return ctx, func() {} + return c.cancelMonitorCtx(ctx) } minDeadline := time.Now().Add(c.contextTimeout) if minDeadline.Before(deadline) { - return ctx, func() {} + return c.cancelMonitorCtx(ctx) } log.Entry(ctx).Warnf("Context deadline has been extended by extendtimeout from %v to %v", deadline, minDeadline) deadline = minDeadline diff --git a/extendtimeout/connection_test.go b/extendtimeout/connection_test.go index 4d0025e..ba943d4 100644 --- a/extendtimeout/connection_test.go +++ b/extendtimeout/connection_test.go @@ -62,10 +62,8 @@ func TestSmallTimeout(t *testing.T) { func TestBigTimeout(t *testing.T) { testConn := &testConn{invokeBody: func(ctx context.Context) { - deadline, ok := ctx.Deadline() - require.True(t, ok) - timeout := time.Until(deadline) - require.Greater(t, timeout, 7*time.Second) + _, ok := ctx.Deadline() + require.False(t, ok) require.Equal(t, ctx.Value(&key{}), value) }} @@ -77,18 +75,39 @@ func TestBigTimeout(t *testing.T) { require.NoError(t, err) } -func TestNoTimeout(t *testing.T) { +func TestOriginalContextExtendedAndCanceled(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + + counter := new(atomic.Int32) + ch := make(chan struct{}, 1) + defer close(ch) testConn := &testConn{invokeBody: func(ctx context.Context) { - _, ok := ctx.Deadline() - require.False(t, ok) + select { + case <-ctx.Done(): + return + case <-ch: + counter.Add(1) + } }} - emptyCtx := context.Background() - err := extendtimeout.NewConnection(testConn, 2*time.Second).Invoke(emptyCtx, nil, nil) - require.NoError(t, err) + cancelCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + go func() { + err := extendtimeout.NewConnection(testConn, 20*time.Second).Invoke(cancelCtx, nil, nil) + require.NoError(t, err) + }() + + cancel() + time.Sleep(50 * time.Millisecond) + ch <- struct{}{} + + require.Eventually(t, func() bool { + return counter.Load() == 1 + }, time.Second, 100*time.Millisecond) } -func TestCanceledContext(t *testing.T) { +func TestOriginalContextNotExtendedAndCanceled(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) @@ -107,11 +126,44 @@ func TestCanceledContext(t *testing.T) { cancelCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) go func() { - err := extendtimeout.NewConnection(testConn, 20*time.Second).Invoke(cancelCtx, nil, nil) + err := extendtimeout.NewConnection(testConn, 5*time.Second).Invoke(cancelCtx, nil, nil) + require.NoError(t, err) + }() + + cancel() + time.Sleep(50 * time.Millisecond) + ch <- struct{}{} + + require.Eventually(t, func() bool { + return counter.Load() == 1 + }, time.Second, 100*time.Millisecond) +} + +func TestOriginalContextNoTimeoutAndCanceled(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + + counter := new(atomic.Int32) + ch := make(chan struct{}, 1) + defer close(ch) + testConn := &testConn{invokeBody: func(ctx context.Context) { + select { + case <-ctx.Done(): + return + case <-ch: + counter.Add(1) + } + }} + + cancelCtx, cancel := context.WithCancel(context.Background()) + go func() { + err := extendtimeout.NewConnection(testConn, 10*time.Second).Invoke(cancelCtx, nil, nil) require.NoError(t, err) }() cancel() + time.Sleep(50 * time.Millisecond) ch <- struct{}{} require.Eventually(t, func() bool { From 696b96192ca65b126040d7c264a3991f0c7c88dd Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Thu, 5 Dec 2024 21:21:05 +1100 Subject: [PATCH 2/3] fix golang ci Signed-off-by: NikitaSkrynnik --- extendtimeout/connection_test.go | 145 ++++++++++++------------------- 1 file changed, 57 insertions(+), 88 deletions(-) diff --git a/extendtimeout/connection_test.go b/extendtimeout/connection_test.go index ba943d4..4c0bba2 100644 --- a/extendtimeout/connection_test.go +++ b/extendtimeout/connection_test.go @@ -75,98 +75,67 @@ func TestBigTimeout(t *testing.T) { require.NoError(t, err) } -func TestOriginalContextExtendedAndCanceled(t *testing.T) { - t.Cleanup(func() { - goleak.VerifyNone(t) - }) - - counter := new(atomic.Int32) - ch := make(chan struct{}, 1) - defer close(ch) - testConn := &testConn{invokeBody: func(ctx context.Context) { - select { - case <-ctx.Done(): - return - case <-ch: - counter.Add(1) - } - }} - - cancelCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - go func() { - err := extendtimeout.NewConnection(testConn, 20*time.Second).Invoke(cancelCtx, nil, nil) - require.NoError(t, err) - }() - - cancel() - time.Sleep(50 * time.Millisecond) - ch <- struct{}{} - - require.Eventually(t, func() bool { - return counter.Load() == 1 - }, time.Second, 100*time.Millisecond) -} - -func TestOriginalContextNotExtendedAndCanceled(t *testing.T) { - t.Cleanup(func() { - goleak.VerifyNone(t) - }) - - counter := new(atomic.Int32) - ch := make(chan struct{}, 1) - defer close(ch) - testConn := &testConn{invokeBody: func(ctx context.Context) { - select { - case <-ctx.Done(): - return - case <-ch: - counter.Add(1) - } - }} - - cancelCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - go func() { - err := extendtimeout.NewConnection(testConn, 5*time.Second).Invoke(cancelCtx, nil, nil) - require.NoError(t, err) - }() - - cancel() - time.Sleep(50 * time.Millisecond) - ch <- struct{}{} - - require.Eventually(t, func() bool { - return counter.Load() == 1 - }, time.Second, 100*time.Millisecond) +func TestOriginalContextCanceled(t *testing.T) { + testCases := []struct { + desc string + originalTimeout time.Duration + extendedTimeout time.Duration + }{ + { + desc: "Extended", + originalTimeout: 10 * time.Second, + extendedTimeout: 20 * time.Second, + }, + { + desc: "NotExtended", + originalTimeout: 10 * time.Second, + extendedTimeout: 5 * time.Second, + }, + { + desc: "WithoutTimeout", + originalTimeout: -1, + extendedTimeout: 10 * time.Second, + }, + } + for _, testCase := range testCases { + t.Run(testCase.desc, testBody(testCase.originalTimeout, testCase.extendedTimeout)) + } } -func TestOriginalContextNoTimeoutAndCanceled(t *testing.T) { - t.Cleanup(func() { - goleak.VerifyNone(t) - }) - - counter := new(atomic.Int32) - ch := make(chan struct{}, 1) - defer close(ch) - testConn := &testConn{invokeBody: func(ctx context.Context) { - select { - case <-ctx.Done(): - return - case <-ch: - counter.Add(1) +func testBody(originalTimeout, extendedTimeout time.Duration) func(t *testing.T) { + return func(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + + counter := new(atomic.Int32) + ch := make(chan struct{}, 1) + defer close(ch) + testConn := &testConn{invokeBody: func(ctx context.Context) { + select { + case <-ctx.Done(): + return + case <-ch: + counter.Add(1) + } + }} + + cancelCtx, cancel := context.WithTimeout(context.Background(), originalTimeout) + if originalTimeout < 0 { + cancelCtx, cancel = context.WithCancel(context.Background()) } - }} - cancelCtx, cancel := context.WithCancel(context.Background()) - go func() { - err := extendtimeout.NewConnection(testConn, 10*time.Second).Invoke(cancelCtx, nil, nil) - require.NoError(t, err) - }() + go func() { + err := extendtimeout.NewConnection(testConn, extendedTimeout).Invoke(cancelCtx, nil, nil) + require.NoError(t, err) + }() - cancel() - time.Sleep(50 * time.Millisecond) - ch <- struct{}{} + cancel() + time.Sleep(50 * time.Millisecond) + ch <- struct{}{} - require.Eventually(t, func() bool { - return counter.Load() == 1 - }, time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { + return counter.Load() == 1 + }, time.Second, 100*time.Millisecond) + } } From e86690de193e8c02cf0043c34c7c8b8d425b286d Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Fri, 6 Dec 2024 16:48:17 +1100 Subject: [PATCH 3/3] refactoring + remove some tests Signed-off-by: NikitaSkrynnik --- extendtimeout/connection.go | 58 ++++------------ extendtimeout/connection_test.go | 116 +++++++------------------------ 2 files changed, 36 insertions(+), 138 deletions(-) diff --git a/extendtimeout/connection.go b/extendtimeout/connection.go index 6b9d1c5..8c4fc4f 100644 --- a/extendtimeout/connection.go +++ b/extendtimeout/connection.go @@ -21,7 +21,6 @@ import ( "context" "time" - "github.com/edwarnicke/log" "go.fd.io/govpp/api" ) @@ -30,15 +29,6 @@ type extendedConnection struct { contextTimeout time.Duration } -type extendedContext struct { - context.Context - valuesContext context.Context -} - -func (ec *extendedContext) Value(key interface{}) interface{} { - return ec.valuesContext.Value(key) -} - // NewConnection - creates a wrapper for vpp connection that uses extended context timeout for all operations func NewConnection(vppConn api.Connection, contextTimeout time.Duration) api.Connection { return &extendedConnection{ @@ -48,50 +38,26 @@ func NewConnection(vppConn api.Connection, contextTimeout time.Duration) api.Con } func (c *extendedConnection) Invoke(ctx context.Context, req, reply api.Message) error { - ctx, cancel := c.withExtendedTimeoutCtx(ctx) + ctx, cancel := c.withExtendedTimeoutContext(ctx) err := c.Connection.Invoke(ctx, req, reply) cancel() return err } -func (c *extendedConnection) cancelMonitorCtx(ctx context.Context) (cancelMonitorCtx context.Context, cancel func()) { - cancelCtx, cancelFunc := context.WithCancel(context.Background()) - cancelCh := make(chan struct{}) +func (c *extendedConnection) withExtendedTimeoutContext(ctx context.Context) (context.Context, context.CancelFunc) { + var cancelContext, cancel = context.WithCancel(context.Background()) + var timeoutContext, timeoutCancel = context.WithTimeout(cancelContext, c.contextTimeout) go func() { + <-timeoutContext.Done() + timeoutCancel() select { - case <-time.After(c.contextTimeout): - <-ctx.Done() - case <-cancelCh: + case <-cancelContext.Done(): + return + case <-ctx.Done(): + cancel() + return } - cancelFunc() }() - cancelMonitorCtx = &extendedContext{ - Context: cancelCtx, - valuesContext: ctx, - } - cancel = func() { - cancelCh <- struct{}{} - close(cancelCh) - } - return -} - -func (c *extendedConnection) withExtendedTimeoutCtx(ctx context.Context) (extendedCtx context.Context, cancel func()) { - deadline, ok := ctx.Deadline() - if !ok { - return c.cancelMonitorCtx(ctx) - } - - minDeadline := time.Now().Add(c.contextTimeout) - if minDeadline.Before(deadline) { - return c.cancelMonitorCtx(ctx) - } - log.Entry(ctx).Warnf("Context deadline has been extended by extendtimeout from %v to %v", deadline, minDeadline) - deadline = minDeadline - postponedCtx, cancel := context.WithDeadline(context.Background(), deadline) - return &extendedContext{ - Context: postponedCtx, - valuesContext: ctx, - }, cancel + return cancelContext, cancel } diff --git a/extendtimeout/connection_test.go b/extendtimeout/connection_test.go index 4c0bba2..1ce342f 100644 --- a/extendtimeout/connection_test.go +++ b/extendtimeout/connection_test.go @@ -39,103 +39,35 @@ func (c *testConn) Invoke(ctx context.Context, req, reply api.Message) error { return nil } -type key struct{} - -const value = "value" - -func TestSmallTimeout(t *testing.T) { - testConn := &testConn{invokeBody: func(ctx context.Context) { - deadline, ok := ctx.Deadline() - require.True(t, ok) - timeout := time.Until(deadline) - require.Greater(t, timeout, time.Second) - require.Equal(t, ctx.Value(&key{}), value) - }} - - smallCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) - smallCtx = context.WithValue(smallCtx, &key{}, value) - defer cancel() - - err := extendtimeout.NewConnection(testConn, 2*time.Second).Invoke(smallCtx, nil, nil) - require.NoError(t, err) -} +func TestOriginalContextCanceled(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) -func TestBigTimeout(t *testing.T) { + counter := new(atomic.Int32) + ch := make(chan struct{}, 1) + defer close(ch) testConn := &testConn{invokeBody: func(ctx context.Context) { - _, ok := ctx.Deadline() - require.False(t, ok) - require.Equal(t, ctx.Value(&key{}), value) + select { + case <-ctx.Done(): + return + case <-ch: + counter.Add(1) + } }} - bigCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - bigCtx = context.WithValue(bigCtx, &key{}, value) - defer cancel() - - err := extendtimeout.NewConnection(testConn, 2*time.Second).Invoke(bigCtx, nil, nil) - require.NoError(t, err) -} - -func TestOriginalContextCanceled(t *testing.T) { - testCases := []struct { - desc string - originalTimeout time.Duration - extendedTimeout time.Duration - }{ - { - desc: "Extended", - originalTimeout: 10 * time.Second, - extendedTimeout: 20 * time.Second, - }, - { - desc: "NotExtended", - originalTimeout: 10 * time.Second, - extendedTimeout: 5 * time.Second, - }, - { - desc: "WithoutTimeout", - originalTimeout: -1, - extendedTimeout: 10 * time.Second, - }, - } - for _, testCase := range testCases { - t.Run(testCase.desc, testBody(testCase.originalTimeout, testCase.extendedTimeout)) - } -} - -func testBody(originalTimeout, extendedTimeout time.Duration) func(t *testing.T) { - return func(t *testing.T) { - t.Cleanup(func() { - goleak.VerifyNone(t) - }) - - counter := new(atomic.Int32) - ch := make(chan struct{}, 1) - defer close(ch) - testConn := &testConn{invokeBody: func(ctx context.Context) { - select { - case <-ctx.Done(): - return - case <-ch: - counter.Add(1) - } - }} - - cancelCtx, cancel := context.WithTimeout(context.Background(), originalTimeout) - if originalTimeout < 0 { - cancelCtx, cancel = context.WithCancel(context.Background()) - } + cancelCtx, cancel := context.WithCancel(context.Background()) - go func() { - err := extendtimeout.NewConnection(testConn, extendedTimeout).Invoke(cancelCtx, nil, nil) - require.NoError(t, err) - }() + go func() { + err := extendtimeout.NewConnection(testConn, 10*time.Second).Invoke(cancelCtx, nil, nil) + require.NoError(t, err) + }() - cancel() - time.Sleep(50 * time.Millisecond) - ch <- struct{}{} + cancel() + time.Sleep(50 * time.Millisecond) + ch <- struct{}{} - require.Eventually(t, func() bool { - return counter.Load() == 1 - }, time.Second, 100*time.Millisecond) - } + require.Eventually(t, func() bool { + return counter.Load() == 1 + }, 200*time.Millisecond, 10*time.Millisecond) }