From 570152db797ea58b18c974a5c25ec54617bcef7f Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Mon, 10 Jun 2024 12:03:45 +0300 Subject: [PATCH 1/5] Enable clients to set flags Signed-off-by: Paschalis Tsilias --- client/client.go | 10 +++++++ client/clientimpl_test.go | 50 +++++++++++++++++++++++++++++++++ client/httpclient.go | 7 ++++- client/internal/clientcommon.go | 13 +++++++++ client/internal/clientstate.go | 13 +++++++-- client/wsclient.go | 4 +++ 6 files changed, 94 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index da9a8181..76b6e06d 100644 --- a/client/client.go +++ b/client/client.go @@ -107,6 +107,16 @@ type OpAMPClient interface { // for more details. SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error + // SetFlags modifies the set of flags supported by the client. + // May be called anytime after Start(), including from OnMessage handler. + // The zero value of protobufs.AgentToServerFlags corresponds to FlagsUnspecified + // and is safe to use. + // + // See + // https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#agenttoserverflags + // for more details. + SetFlags(flags protobufs.AgentToServerFlags) + // SendCustomMessage sends the custom message to the Server. May be called anytime after // Start(), including from OnMessage handler. // diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index 46e8510b..05d98d89 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -2122,3 +2122,53 @@ func TestSetCustomCapabilities(t *testing.T) { assert.NoError(t, err) }) } + +// TestSetFlags tests the ability for the client to change the set of flags it sends. +func TestSetFlags(t *testing.T) { + testClients(t, func(t *testing.T, client OpAMPClient) { + + // Start a Server. + srv := internal.StartMockServer(t) + var rcvCustomFlags atomic.Value + var flags protobufs.AgentToServerFlags + + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + if msg.Flags != 0 { + rcvCustomFlags.Store(msg.Flags) + } + return nil + } + + settings := types.StartSettings{} + settings.OpAMPServerURL = "ws://" + srv.Endpoint + prepareClient(t, &settings, client) + + assert.NoError(t, client.Start(context.Background(), settings)) + + // The zero value of AgentToServerFlags is ready to use + client.SetFlags(flags) + + // Update flags to send + flags |= protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid + client.SetFlags(flags) + + // Verify new flags were delivered to the server + eventually( + t, + func() bool { + msg, ok := rcvCustomFlags.Load().(uint64) + if !ok || msg == 0 { + return false + } + return uint64(flags) == msg + }, + ) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) +} diff --git a/client/httpclient.go b/client/httpclient.go index 117c8187..b27c249c 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -105,11 +105,16 @@ func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) err return c.common.SetPackageStatuses(statuses) } -// SendCustomMessage implements OpAMPClient.SetCustomCapabilities. +// SendCustomCapabilities implements OpAMPClient.SetCustomCapabilities. func (c *httpClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error { return c.common.SetCustomCapabilities(customCapabilities) } +// SetFlags implements OpAMPClient.SetFlags. +func (c *httpClient) SetFlags(flags protobufs.AgentToServerFlags) { + c.common.SetFlags(flags) +} + // SendCustomMessage implements OpAMPClient.SendCustomMessage. func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { return c.common.SendCustomMessage(message) diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 4521ff94..648a7415 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -385,6 +385,19 @@ func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.Custo return nil } +func (c *ClientCommon) SetFlags(flags protobufs.AgentToServerFlags) { + // store the flags to send + c.ClientSyncedState.SetFlags(flags) + + // send the new flags to the Server + c.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.Flags = uint64(flags) + }, + ) + c.sender.ScheduleSend() +} + // SendCustomMessage sends the specified custom message to the server. func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { if message == nil { diff --git a/client/internal/clientstate.go b/client/internal/clientstate.go index 97eb4dc9..9c112e3c 100644 --- a/client/internal/clientstate.go +++ b/client/internal/clientstate.go @@ -17,8 +17,8 @@ var ( ) // ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to -// have access to synchronize to the Server. 5 messages can be stored in this store: -// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses and CustomCapabilities. +// have access to synchronize to the Server. Six messages can be stored in this store: +// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags. // // See OpAMP spec for more details on how status reporting works: // https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting @@ -39,6 +39,7 @@ type ClientSyncedState struct { remoteConfigStatus *protobufs.RemoteConfigStatus packageStatuses *protobufs.PackageStatuses customCapabilities *protobufs.CustomCapabilities + flags protobufs.AgentToServerFlags } func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription { @@ -168,3 +169,11 @@ func (s *ClientSyncedState) HasCustomCapability(capability string) bool { return false } + +// SetFlags sets the flags in the state. +func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) { + defer s.mutex.Unlock() + s.mutex.Lock() + + s.flags = flags +} diff --git a/client/wsclient.go b/client/wsclient.go index b017a44e..030972fc 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -129,6 +129,10 @@ func (c *wsClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCap return c.common.SetCustomCapabilities(customCapabilities) } +func (c *wsClient) SetFlags(flags protobufs.AgentToServerFlags) { + c.common.SetFlags(flags) +} + func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { return c.common.SendCustomMessage(message) } From bd001cf181130023724040f1518f855b2e193302 Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Wed, 19 Jun 2024 18:29:14 +0300 Subject: [PATCH 2/5] Handle flags in a) PrepareFirstMessage, b) ReportFullState; reset the flag after a new ID was requested and set correctly Signed-off-by: Paschalis Tsilias --- client/clientimpl_test.go | 25 +++++++++++-------------- client/internal/clientcommon.go | 1 + client/internal/receivedprocessor.go | 6 +++++- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index 05d98d89..6f513d0a 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -93,7 +93,7 @@ func eventually(t *testing.T, f func() bool) { assert.Eventually(t, f, 5*time.Second, 10*time.Millisecond) } -func newInstanceUid(t *testing.T) types.InstanceUid { +func genNewInstanceUid(t *testing.T) types.InstanceUid { uid, err := uuid.NewV7() require.NoError(t, err) b, err := uid.MarshalBinary() @@ -103,7 +103,7 @@ func newInstanceUid(t *testing.T) types.InstanceUid { func prepareSettings(t *testing.T, settings *types.StartSettings, c OpAMPClient) { // Autogenerate instance id. - settings.InstanceUid = newInstanceUid(t) + settings.InstanceUid = genNewInstanceUid(t) // Make sure correct URL scheme is used, based on the type of the OpAMP client. u, err := url.Parse(settings.OpAMPServerURL) @@ -630,27 +630,24 @@ func TestAgentIdentification(t *testing.T) { testClients(t, func(t *testing.T, client OpAMPClient) { // Start a server. srv := internal.StartMockServer(t) - newInstanceUid := newInstanceUid(t) + newInstanceUid := genNewInstanceUid(t) var rcvAgentInstanceUid atomic.Value - var sentInvalidId atomic.Bool srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { - rcvAgentInstanceUid.Store(msg.InstanceUid) - if sentInvalidId.Load() { + if msg.Flags&uint64(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid) == 1 { + newInstanceUid = genNewInstanceUid(t) + rcvAgentInstanceUid.Store(newInstanceUid[:]) return &protobufs.ServerToAgent{ InstanceUid: msg.InstanceUid, AgentIdentification: &protobufs.AgentIdentification{ - // If we sent the invalid one first, send a valid one now + // If the RequestInstanceUid flag was set, populate this field. NewInstanceUid: newInstanceUid[:], }, } } - sentInvalidId.Store(true) + rcvAgentInstanceUid.Store(msg.InstanceUid) + // Start by sending just the old instance ID. return &protobufs.ServerToAgent{ InstanceUid: msg.InstanceUid, - AgentIdentification: &protobufs.AgentIdentification{ - // Start by sending an invalid id forcing an error. - NewInstanceUid: nil, - }, } } @@ -689,8 +686,8 @@ func TestAgentIdentification(t *testing.T) { }, ) - // Send a dummy message again to get the _new_ id - _ = client.SetAgentDescription(createAgentDescr()) + // Set the flags to request a new ID. + client.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid) // When it was sent, the new instance uid should have been used, which should // have been observed by the Server diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 648a7415..a4786385 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -214,6 +214,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { msg.PackageStatuses = c.ClientSyncedState.PackageStatuses() msg.Capabilities = uint64(c.Capabilities) msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities() + msg.Flags = uint64(c.ClientSyncedState.flags) }, ) return nil diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 736073d5..a1388628 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -190,8 +190,9 @@ func (r *receivedProcessor) rcvFlags( msg.RemoteConfigStatus = r.clientSyncedState.RemoteConfigStatus() msg.PackageStatuses = r.clientSyncedState.PackageStatuses() msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities() + msg.Flags = uint64(r.clientSyncedState.flags) - // The logic for EffectiveConfig is similar to the previous 5 sub-messages however + // The logic for EffectiveConfig is similar to the previous 6 sub-messages however // the EffectiveConfig is fetched using GetEffectiveConfig instead of // from clientSyncedState. We do this to avoid keeping EffectiveConfig in-memory. msg.EffectiveConfig = cfg @@ -237,6 +238,9 @@ func (r *receivedProcessor) rcvAgentIdentification(ctx context.Context, agentId return err } + // If we set up a new instance ID, reset the RequestInstanceUid flag. + r.clientSyncedState.flags &^= protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid + return nil } From ca66f1d9f9621308d01b2d1a11aa076c04d70ad4 Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Thu, 20 Jun 2024 12:00:35 +0300 Subject: [PATCH 3/5] Add test around flag resetting Signed-off-by: Paschalis Tsilias --- client/internal/httpsender_test.go | 36 ++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/client/internal/httpsender_test.go b/client/internal/httpsender_test.go index 420bc9e1..65df9c5b 100644 --- a/client/internal/httpsender_test.go +++ b/client/internal/httpsender_test.go @@ -172,3 +172,39 @@ func TestHTTPSenderRetryForFailedRequests(t *testing.T) { cancel() srv.Close() } + +func TestRequestInstanceUidFlagReset(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + sender := NewHTTPSender(&sharedinternal.NopLogger{}) + sender.callbacks = types.CallbacksStruct{} + + // Set the RequestInstanceUid flag on the tracked state to request the server for a new ID to use. + clientSyncedState := &ClientSyncedState{} + clientSyncedState.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid) + capabilities := protobufs.AgentCapabilities_AgentCapabilities_Unspecified + sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities) + + // If we process a message with a nil AgentIdentification, or an incorrect NewInstanceUid. + sender.receiveProcessor.ProcessReceivedMessage(ctx, + &protobufs.ServerToAgent{ + AgentIdentification: nil, + }) + sender.receiveProcessor.ProcessReceivedMessage(ctx, + &protobufs.ServerToAgent{ + AgentIdentification: &protobufs.AgentIdentification{NewInstanceUid: []byte("foo")}, + }) + + // Then the RequestInstanceUid flag stays intact. + assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid) + + // If we process a message that contains a non-nil AgentIdentification that contains a NewInstanceUid. + sender.receiveProcessor.ProcessReceivedMessage(ctx, + &protobufs.ServerToAgent{ + AgentIdentification: &protobufs.AgentIdentification{NewInstanceUid: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}}, + }) + + // Then the flag is reset so we don't request a new instance uid yet again. + assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified) + cancel() +} From 93a542d54f8a06d723f624f4923444ca5a7dd4e4 Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Thu, 20 Jun 2024 22:57:16 +0300 Subject: [PATCH 4/5] Address review feedback Signed-off-by: Paschalis Tsilias --- client/client.go | 2 +- client/internal/clientcommon.go | 2 +- client/internal/clientstate.go | 6 ++++++ client/internal/receivedprocessor.go | 2 +- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index 76b6e06d..afc08315 100644 --- a/client/client.go +++ b/client/client.go @@ -108,7 +108,7 @@ type OpAMPClient interface { SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error // SetFlags modifies the set of flags supported by the client. - // May be called anytime after Start(), including from OnMessage handler. + // May be called before or after Start(), including from OnMessage handler. // The zero value of protobufs.AgentToServerFlags corresponds to FlagsUnspecified // and is safe to use. // diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index a4786385..a8df3640 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -214,7 +214,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { msg.PackageStatuses = c.ClientSyncedState.PackageStatuses() msg.Capabilities = uint64(c.Capabilities) msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities() - msg.Flags = uint64(c.ClientSyncedState.flags) + msg.Flags = c.ClientSyncedState.Flags() }, ) return nil diff --git a/client/internal/clientstate.go b/client/internal/clientstate.go index 9c112e3c..93250c9f 100644 --- a/client/internal/clientstate.go +++ b/client/internal/clientstate.go @@ -72,6 +72,12 @@ func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities { return s.customCapabilities } +func (s *ClientSyncedState) Flags() uint64 { + defer s.mutex.Unlock() + s.mutex.Lock() + return uint64(s.flags) +} + // SetAgentDescription sets the AgentDescription in the state. func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescription) error { if descr == nil { diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index a1388628..bd18a146 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -190,7 +190,7 @@ func (r *receivedProcessor) rcvFlags( msg.RemoteConfigStatus = r.clientSyncedState.RemoteConfigStatus() msg.PackageStatuses = r.clientSyncedState.PackageStatuses() msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities() - msg.Flags = uint64(r.clientSyncedState.flags) + msg.Flags = r.clientSyncedState.Flags() // The logic for EffectiveConfig is similar to the previous 6 sub-messages however // the EffectiveConfig is fetched using GetEffectiveConfig instead of From 1f74dfd599541fa5266e7dfe3b659945616e09a2 Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Wed, 26 Jun 2024 17:37:03 +0300 Subject: [PATCH 5/5] Add TestSetFlagsBeforeStart testcase Signed-off-by: Paschalis Tsilias --- client/clientimpl_test.go | 51 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index 6f513d0a..78ecb5c9 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -2169,3 +2169,54 @@ func TestSetFlags(t *testing.T) { assert.NoError(t, err) }) } + +// TestSetFlags tests the ability for the client to set its flags before starting up. +func TestSetFlagsBeforeStart(t *testing.T) { + testClients(t, func(t *testing.T, client OpAMPClient) { + // Start a Server. + flags := protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid + srv := internal.StartMockServer(t) + var rcvCustomFlags atomic.Value + var isFirstMessage atomic.Bool + isFirstMessage.Store(true) + + // Make sure we only record flags from the very first message. + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + if isFirstMessage.Load() { + rcvCustomFlags.Store(msg.Flags) + } + isFirstMessage.Store(false) + return nil + } + + settings := types.StartSettings{} + settings.OpAMPServerURL = "ws://" + srv.Endpoint + prepareClient(t, &settings, client) + + // Set up the flags _before_ calling Start to verify that they're + // handled correctly in PrepareFirstMessage. + client.SetFlags(flags) + + // Start the client. + assert.NoError(t, client.Start(context.Background(), settings)) + + // Verify the flags were delivered to the server during the first message. + eventually( + t, + func() bool { + msg, ok := rcvCustomFlags.Load().(uint64) + if !ok || msg == 0 { + return false + } + return uint64(flags) == msg + }, + ) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) +}