Skip to content

Commit

Permalink
Enable clients to set flags
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
  • Loading branch information
tpaschalis committed Jun 19, 2024
1 parent c64c461 commit 570152d
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 3 deletions.
10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ type OpAMPClient interface {
// for more details.
SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error

// SetFlags modifies the set of flags supported by the client.
// May be called anytime after Start(), including from OnMessage handler.
// The zero value of protobufs.AgentToServerFlags corresponds to FlagsUnspecified
// and is safe to use.
//
// See
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#agenttoserverflags
// for more details.
SetFlags(flags protobufs.AgentToServerFlags)

// SendCustomMessage sends the custom message to the Server. May be called anytime after
// Start(), including from OnMessage handler.
//
Expand Down
50 changes: 50 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2122,3 +2122,53 @@ func TestSetCustomCapabilities(t *testing.T) {
assert.NoError(t, err)
})
}

// TestSetFlags tests the ability for the client to change the set of flags it sends.
func TestSetFlags(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
var rcvCustomFlags atomic.Value
var flags protobufs.AgentToServerFlags

srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
if msg.Flags != 0 {
rcvCustomFlags.Store(msg.Flags)
}
return nil
}

settings := types.StartSettings{}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

// The zero value of AgentToServerFlags is ready to use
client.SetFlags(flags)

// Update flags to send
flags |= protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid
client.SetFlags(flags)

// Verify new flags were delivered to the server
eventually(
t,
func() bool {
msg, ok := rcvCustomFlags.Load().(uint64)
if !ok || msg == 0 {
return false
}
return uint64(flags) == msg
},
)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
7 changes: 6 additions & 1 deletion client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,16 @@ func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) err
return c.common.SetPackageStatuses(statuses)
}

// SendCustomMessage implements OpAMPClient.SetCustomCapabilities.
// SendCustomCapabilities implements OpAMPClient.SetCustomCapabilities.
func (c *httpClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error {
return c.common.SetCustomCapabilities(customCapabilities)
}

// SetFlags implements OpAMPClient.SetFlags.
func (c *httpClient) SetFlags(flags protobufs.AgentToServerFlags) {
c.common.SetFlags(flags)
}

// SendCustomMessage implements OpAMPClient.SendCustomMessage.
func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
return c.common.SendCustomMessage(message)
Expand Down
13 changes: 13 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,19 @@ func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.Custo
return nil
}

func (c *ClientCommon) SetFlags(flags protobufs.AgentToServerFlags) {
// store the flags to send
c.ClientSyncedState.SetFlags(flags)

// send the new flags to the Server
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.Flags = uint64(flags)
},
)
c.sender.ScheduleSend()
}

// SendCustomMessage sends the specified custom message to the server.
func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
if message == nil {
Expand Down
13 changes: 11 additions & 2 deletions client/internal/clientstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ var (
)

// ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to
// have access to synchronize to the Server. 5 messages can be stored in this store:
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses and CustomCapabilities.
// have access to synchronize to the Server. Six messages can be stored in this store:
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags.
//
// See OpAMP spec for more details on how status reporting works:
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting
Expand All @@ -39,6 +39,7 @@ type ClientSyncedState struct {
remoteConfigStatus *protobufs.RemoteConfigStatus
packageStatuses *protobufs.PackageStatuses
customCapabilities *protobufs.CustomCapabilities
flags protobufs.AgentToServerFlags
}

func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription {
Expand Down Expand Up @@ -168,3 +169,11 @@ func (s *ClientSyncedState) HasCustomCapability(capability string) bool {

return false
}

// SetFlags sets the flags in the state.
func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) {
defer s.mutex.Unlock()
s.mutex.Lock()

s.flags = flags
}
4 changes: 4 additions & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (c *wsClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCap
return c.common.SetCustomCapabilities(customCapabilities)
}

func (c *wsClient) SetFlags(flags protobufs.AgentToServerFlags) {
c.common.SetFlags(flags)
}

func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
return c.common.SendCustomMessage(message)
}
Expand Down

0 comments on commit 570152d

Please sign in to comment.