Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit to single inflight package syncing operation #289

Merged
merged 12 commits into from
Aug 29, 2024
1 change: 1 addition & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (c *httpClient) runUntilStopped(ctx context.Context) {
&c.common.ClientSyncedState,
c.common.PackagesStateProvider,
c.common.Capabilities,
&c.common.PackageSyncMutex,
)
}

Expand Down
3 changes: 3 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type ClientCommon struct {
// PackagesStateProvider provides access to the local state of packages.
PackagesStateProvider types.PackagesStateProvider

// PackageSyncMutex makes sure only one package syncing operation happens at a time.
PackageSyncMutex sync.Mutex

// The transport-specific sender.
sender Sender

Expand Down
4 changes: 3 additions & 1 deletion client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net/http"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -91,10 +92,11 @@ func (h *HTTPSender) Run(
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
) {
h.url = url
h.callbacks = callbacks
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities)
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex)

for {
pollingTimer := time.NewTimer(time.Millisecond * time.Duration(atomic.LoadInt64(&h.pollingIntervalMs)))
Expand Down
122 changes: 121 additions & 1 deletion client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
clientSyncedState := &ClientSyncedState{}
clientSyncedState.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)
capabilities := protobufs.AgentCapabilities_AgentCapabilities_Unspecified
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities)
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities, new(sync.Mutex))

// If we process a message with a nil AgentIdentification, or an incorrect NewInstanceUid.
sender.receiveProcessor.ProcessReceivedMessage(ctx,
Expand All @@ -208,3 +208,123 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified)
cancel()
}

func TestPackageUpdatesInParallel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
localPackageState := NewInMemPackagesStore()
sender := NewHTTPSender(&sharedinternal.NopLogger{})
ch := make(chan struct{})
tpaschalis marked this conversation as resolved.
Show resolved Hide resolved
doneCh := make([]<-chan struct{}, 0)

// Use this to simulate blocking behavior on the first call to Sync().
localPackageState.onAllPackagesHash = func() {
if localPackageState.lastReportedStatuses != nil {
<-ch
}
}

var messages atomic.Int32
var mux sync.Mutex
sender.callbacks = types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
err := msg.PackageSyncer.Sync(ctx)
assert.NoError(t, err)
messages.Add(1)
doneCh = append(doneCh, msg.PackageSyncer.Done())
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
},
}

clientSyncedState := &ClientSyncedState{}
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, localPackageState, capabilities, &mux)

sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{
"package1": {
Type: protobufs.PackageType_PackageType_TopLevel,
Version: "1.0.0",
File: &protobufs.DownloadableFile{
DownloadUrl: "foo",
ContentHash: []byte{4, 5},
},
Hash: []byte{1, 2, 3},
},
},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},
})
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{
"package22": {
Type: protobufs.PackageType_PackageType_TopLevel,
Version: "1.0.0",
File: &protobufs.DownloadableFile{
DownloadUrl: "bar",
ContentHash: []byte{4, 5},
},
Hash: []byte{1, 2, 3},
},
},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},
})

// Make sure that both Sync calls have gone through _before_ releasing the first.
// This means that they're both called in parallel, but locking makes sure
// that this is not a race condition.
assert.Eventually(t, func() bool {
return messages.Load() == 2
}, 2*time.Second, 100*time.Millisecond, "both messages must have been processed successfully")

// Release the first Sync call so it can continue and wait for both of them to complete.
ch <- struct{}{}
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
<-doneCh[0]
<-doneCh[1]

cancel()
}

func TestPackageUpdatesWithError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
sender := NewHTTPSender(&sharedinternal.NopLogger{})

// We'll pass in a nil PackageStateProvider to force the Sync call to return with an error.
localPackageState := types.PackagesStateProvider(nil)
var messages atomic.Int32
var mux sync.Mutex
sender.callbacks = types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
// Make sure the call to Sync will return an error due to a nil PackageStateProvider
err := msg.PackageSyncer.Sync(ctx)
assert.Error(t, err)
messages.Add(1)
},
}

clientSyncedState := &ClientSyncedState{}

capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, localPackageState, capabilities, &mux)

// Send two messages in parallel.
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{},
})
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{},
})

// Make sure that even though the call to Sync errored out early, the lock
// was still released properly for both messages to be processed.
assert.Eventually(t, func() bool {
return messages.Load() == 2
}, 5*time.Second, 100*time.Millisecond, "both messages must have been processed successfully")

cancel()
}
5 changes: 5 additions & 0 deletions client/internal/inmempackagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type InMemPackagesStore struct {
fileContents map[string][]byte
fileHashes map[string][]byte
lastReportedStatuses *protobufs.PackageStatuses

onAllPackagesHash func()
}

var _ types.PackagesStateProvider = (*InMemPackagesStore)(nil)
Expand All @@ -28,6 +30,9 @@ func NewInMemPackagesStore() *InMemPackagesStore {
}

func (l *InMemPackagesStore) AllPackagesHash() ([]byte, error) {
if l.onAllPackagesHash != nil {
l.onAllPackagesHash()
}
return l.allPackagesHash, nil
}

Expand Down
19 changes: 18 additions & 1 deletion client/internal/packagessyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"errors"
"fmt"
"net/http"
"sync"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -20,6 +21,7 @@
sender Sender

statuses *protobufs.PackageStatuses
mux *sync.Mutex
doneCh chan struct{}
}

Expand All @@ -30,6 +32,7 @@
sender Sender,
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
mux *sync.Mutex,
) *packagesSyncer {
return &packagesSyncer{
logger: logger,
Expand All @@ -38,6 +41,7 @@
clientSyncedState: clientSyncedState,
localState: packagesStateProvider,
doneCh: make(chan struct{}),
mux: mux,
}
}

Expand All @@ -49,15 +53,24 @@
}()

// Prepare package statuses.
// Grab a lock to make sure that package statuses are not overriden by
// another call to Sync running in parallel.
// In case Sync returns early with an error, take care of unlocking the
// mutex in this goroutine; otherwise it will be unlocked at the end
// of the sync operation.
s.mux.Lock()
if err := s.initStatuses(); err != nil {
s.mux.Unlock()
return err
}

if err := s.clientSyncedState.SetPackageStatuses(s.statuses); err != nil {
s.mux.Unlock()

Check warning on line 68 in client/internal/packagessyncer.go

View check run for this annotation

Codecov / codecov/patch

client/internal/packagessyncer.go#L68

Added line #L68 was not covered by tests
return err
}

// Now do the actual syncing in the background.
// Now do the actual syncing in the background and release the lock from
// inside of the goroutine.
go s.doSync(ctx)

return nil
Expand Down Expand Up @@ -99,6 +112,10 @@

// doSync performs the actual syncing process.
func (s *packagesSyncer) doSync(ctx context.Context) {
// Once doSync returns in a separate goroutine, make sure to release the
// mutex so that a new syncing process can take place.
defer s.mux.Unlock()

hash, err := s.localState.AllPackagesHash()
if err != nil {
s.logger.Errorf(ctx, "Package syncing failed: %V", err)
Expand Down
7 changes: 7 additions & 0 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"sync"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -24,6 +25,9 @@ type receivedProcessor struct {

packagesStateProvider types.PackagesStateProvider

// packageSyncMutex protects against multiple package syncing operations at the same time.
packageSyncMutex *sync.Mutex

// Agent's capabilities defined at Start() time.
capabilities protobufs.AgentCapabilities
}
Expand All @@ -35,6 +39,7 @@ func newReceivedProcessor(
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
) receivedProcessor {
return receivedProcessor{
logger: logger,
Expand All @@ -43,6 +48,7 @@ func newReceivedProcessor(
clientSyncedState: clientSyncedState,
packagesStateProvider: packagesStateProvider,
capabilities: capabilities,
packageSyncMutex: packageSyncMutex,
}
}

Expand Down Expand Up @@ -122,6 +128,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
r.sender,
r.clientSyncedState,
r.packagesStateProvider,
r.packageSyncMutex,
)
} else {
r.logger.Debugf(ctx, "Ignoring PackagesAvailable, agent does not have AcceptsPackages capability")
Expand Down
4 changes: 3 additions & 1 deletion client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"sync"

"github.com/gorilla/websocket"
"github.com/open-telemetry/opamp-go/client/types"
Expand Down Expand Up @@ -32,13 +33,14 @@ func NewWSReceiver(
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
) *wsReceiver {
w := &wsReceiver{
conn: conn,
logger: logger,
sender: sender,
callbacks: callbacks,
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities),
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex),
stopped: make(chan struct{}),
}

Expand Down
Loading
Loading