Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable clients to set flags #286

Merged
merged 5 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ type OpAMPClient interface {
// for more details.
SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error

// SetFlags modifies the set of flags supported by the client.
// May be called before or after Start(), including from OnMessage handler.
// The zero value of protobufs.AgentToServerFlags corresponds to FlagsUnspecified
// and is safe to use.
//
// See
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#agenttoserverflags
// for more details.
SetFlags(flags protobufs.AgentToServerFlags)

// SendCustomMessage sends the custom message to the Server. May be called anytime after
// Start(), including from OnMessage handler.
//
Expand Down
75 changes: 61 additions & 14 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func eventually(t *testing.T, f func() bool) {
assert.Eventually(t, f, 5*time.Second, 10*time.Millisecond)
}

func newInstanceUid(t *testing.T) types.InstanceUid {
func genNewInstanceUid(t *testing.T) types.InstanceUid {
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
uid, err := uuid.NewV7()
require.NoError(t, err)
b, err := uid.MarshalBinary()
Expand All @@ -103,7 +103,7 @@ func newInstanceUid(t *testing.T) types.InstanceUid {

func prepareSettings(t *testing.T, settings *types.StartSettings, c OpAMPClient) {
// Autogenerate instance id.
settings.InstanceUid = newInstanceUid(t)
settings.InstanceUid = genNewInstanceUid(t)

// Make sure correct URL scheme is used, based on the type of the OpAMP client.
u, err := url.Parse(settings.OpAMPServerURL)
Expand Down Expand Up @@ -630,27 +630,24 @@ func TestAgentIdentification(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
// Start a server.
srv := internal.StartMockServer(t)
newInstanceUid := newInstanceUid(t)
newInstanceUid := genNewInstanceUid(t)
var rcvAgentInstanceUid atomic.Value
var sentInvalidId atomic.Bool
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
rcvAgentInstanceUid.Store(msg.InstanceUid)
if sentInvalidId.Load() {
if msg.Flags&uint64(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid) == 1 {
newInstanceUid = genNewInstanceUid(t)
rcvAgentInstanceUid.Store(newInstanceUid[:])
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
AgentIdentification: &protobufs.AgentIdentification{
// If we sent the invalid one first, send a valid one now
// If the RequestInstanceUid flag was set, populate this field.
NewInstanceUid: newInstanceUid[:],
},
}
}
sentInvalidId.Store(true)
rcvAgentInstanceUid.Store(msg.InstanceUid)
// Start by sending just the old instance ID.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
AgentIdentification: &protobufs.AgentIdentification{
// Start by sending an invalid id forcing an error.
NewInstanceUid: nil,
},
}
}

Expand Down Expand Up @@ -689,8 +686,8 @@ func TestAgentIdentification(t *testing.T) {
},
)

// Send a dummy message again to get the _new_ id
_ = client.SetAgentDescription(createAgentDescr())
// Set the flags to request a new ID.
client.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)

// When it was sent, the new instance uid should have been used, which should
// have been observed by the Server
Expand Down Expand Up @@ -2122,3 +2119,53 @@ func TestSetCustomCapabilities(t *testing.T) {
assert.NoError(t, err)
})
}

// TestSetFlags tests the ability for the client to change the set of flags it sends.
func TestSetFlags(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
var rcvCustomFlags atomic.Value
var flags protobufs.AgentToServerFlags

srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg.Flags != 0 {
rcvCustomFlags.Store(msg.Flags)
}
return nil
}

settings := types.StartSettings{}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

// The zero value of AgentToServerFlags is ready to use
client.SetFlags(flags)
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved

// Update flags to send
flags |= protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid
client.SetFlags(flags)

// Verify new flags were delivered to the server
eventually(
t,
func() bool {
msg, ok := rcvCustomFlags.Load().(uint64)
if !ok || msg == 0 {
return false
}
return uint64(flags) == msg
},
)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
7 changes: 6 additions & 1 deletion client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,16 @@ func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) err
return c.common.SetPackageStatuses(statuses)
}

// SendCustomMessage implements OpAMPClient.SetCustomCapabilities.
// SendCustomCapabilities implements OpAMPClient.SetCustomCapabilities.
func (c *httpClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error {
return c.common.SetCustomCapabilities(customCapabilities)
}

// SetFlags implements OpAMPClient.SetFlags.
func (c *httpClient) SetFlags(flags protobufs.AgentToServerFlags) {
c.common.SetFlags(flags)
}

// SendCustomMessage implements OpAMPClient.SendCustomMessage.
func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
return c.common.SendCustomMessage(message)
Expand Down
14 changes: 14 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
msg.PackageStatuses = c.ClientSyncedState.PackageStatuses()
msg.Capabilities = uint64(c.Capabilities)
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
msg.Flags = c.ClientSyncedState.Flags()
},
)
return nil
Expand Down Expand Up @@ -385,6 +386,19 @@ func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.Custo
return nil
}

func (c *ClientCommon) SetFlags(flags protobufs.AgentToServerFlags) {
// store the flags to send
c.ClientSyncedState.SetFlags(flags)

// send the new flags to the Server
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.Flags = uint64(flags)
},
)
c.sender.ScheduleSend()
}

// SendCustomMessage sends the specified custom message to the server.
func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
if message == nil {
Expand Down
19 changes: 17 additions & 2 deletions client/internal/clientstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ var (
)

// ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to
// have access to synchronize to the Server. 5 messages can be stored in this store:
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses and CustomCapabilities.
// have access to synchronize to the Server. Six messages can be stored in this store:
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags.
//
// See OpAMP spec for more details on how status reporting works:
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting
Expand All @@ -39,6 +39,7 @@ type ClientSyncedState struct {
remoteConfigStatus *protobufs.RemoteConfigStatus
packageStatuses *protobufs.PackageStatuses
customCapabilities *protobufs.CustomCapabilities
flags protobufs.AgentToServerFlags
}

func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription {
Expand Down Expand Up @@ -71,6 +72,12 @@ func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities {
return s.customCapabilities
}

func (s *ClientSyncedState) Flags() uint64 {
defer s.mutex.Unlock()
s.mutex.Lock()
return uint64(s.flags)
}

// SetAgentDescription sets the AgentDescription in the state.
func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescription) error {
if descr == nil {
Expand Down Expand Up @@ -168,3 +175,11 @@ func (s *ClientSyncedState) HasCustomCapability(capability string) bool {

return false
}

// SetFlags sets the flags in the state.
func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) {
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
defer s.mutex.Unlock()
s.mutex.Lock()

s.flags = flags
}
36 changes: 36 additions & 0 deletions client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,39 @@ func TestHTTPSenderRetryForFailedRequests(t *testing.T) {
cancel()
srv.Close()
}

func TestRequestInstanceUidFlagReset(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

sender := NewHTTPSender(&sharedinternal.NopLogger{})
sender.callbacks = types.CallbacksStruct{}

// Set the RequestInstanceUid flag on the tracked state to request the server for a new ID to use.
clientSyncedState := &ClientSyncedState{}
clientSyncedState.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)
capabilities := protobufs.AgentCapabilities_AgentCapabilities_Unspecified
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities)

// If we process a message with a nil AgentIdentification, or an incorrect NewInstanceUid.
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
AgentIdentification: nil,
})
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
AgentIdentification: &protobufs.AgentIdentification{NewInstanceUid: []byte("foo")},
})

// Then the RequestInstanceUid flag stays intact.
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)

// If we process a message that contains a non-nil AgentIdentification that contains a NewInstanceUid.
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
AgentIdentification: &protobufs.AgentIdentification{NewInstanceUid: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}},
})

// Then the flag is reset so we don't request a new instance uid yet again.
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified)
cancel()
}
6 changes: 5 additions & 1 deletion client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ func (r *receivedProcessor) rcvFlags(
msg.RemoteConfigStatus = r.clientSyncedState.RemoteConfigStatus()
msg.PackageStatuses = r.clientSyncedState.PackageStatuses()
msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities()
msg.Flags = r.clientSyncedState.Flags()

// The logic for EffectiveConfig is similar to the previous 5 sub-messages however
// The logic for EffectiveConfig is similar to the previous 6 sub-messages however
// the EffectiveConfig is fetched using GetEffectiveConfig instead of
// from clientSyncedState. We do this to avoid keeping EffectiveConfig in-memory.
msg.EffectiveConfig = cfg
Expand Down Expand Up @@ -237,6 +238,9 @@ func (r *receivedProcessor) rcvAgentIdentification(ctx context.Context, agentId
return err
}

// If we set up a new instance ID, reset the RequestInstanceUid flag.
r.clientSyncedState.flags &^= protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (c *wsClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCap
return c.common.SetCustomCapabilities(customCapabilities)
}

func (c *wsClient) SetFlags(flags protobufs.AgentToServerFlags) {
c.common.SetFlags(flags)
}

func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
return c.common.SendCustomMessage(message)
}
Expand Down
Loading