From b304293246059442310e900766b45712f9b77e82 Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Tue, 23 May 2023 09:02:14 -0400 Subject: [PATCH 1/2] Implement a Catalog Controllers Lifecycle Integration Test --- internal/catalog/catalogtest/run_test.go | 5 + .../catalogtest/test_integration_v1alpha1.go | 1 + .../catalogtest/test_lifecycle_v1alpha1.go | 706 ++++++++++++++++++ internal/catalog/exports.go | 18 + internal/resource/resourcetest/builder.go | 17 +- internal/resource/resourcetest/client.go | 20 +- 6 files changed, 765 insertions(+), 2 deletions(-) create mode 100644 internal/catalog/catalogtest/test_lifecycle_v1alpha1.go diff --git a/internal/catalog/catalogtest/run_test.go b/internal/catalog/catalogtest/run_test.go index 7c17052d8246..defaad2a16d6 100644 --- a/internal/catalog/catalogtest/run_test.go +++ b/internal/catalog/catalogtest/run_test.go @@ -37,3 +37,8 @@ func TestControllers_Integration(t *testing.T) { client := runInMemResourceServiceAndControllers(t, catalog.DefaultControllerDependencies()) RunCatalogV1Alpha1IntegrationTest(t, client) } + +func TestControllers_Lifecycle(t *testing.T) { + client := runInMemResourceServiceAndControllers(t, catalog.DefaultControllerDependencies()) + RunCatalogV1Alpha1LifecycleIntegrationTest(t, client) +} diff --git a/internal/catalog/catalogtest/test_integration_v1alpha1.go b/internal/catalog/catalogtest/test_integration_v1alpha1.go index 8a7f4cd9a248..19be6d7a4846 100644 --- a/internal/catalog/catalogtest/test_integration_v1alpha1.go +++ b/internal/catalog/catalogtest/test_integration_v1alpha1.go @@ -698,6 +698,7 @@ func expectedGRPCApiServiceEndpoints(t *testing.T, c *rtest.Client) *pbcatalog.S } func verifyServiceEndpoints(t *testing.T, c *rtest.Client, id *pbresource.ID, expected *pbcatalog.ServiceEndpoints) { + t.Helper() c.WaitForResourceState(t, id, func(t rtest.T, res *pbresource.Resource) { var actual pbcatalog.ServiceEndpoints err := res.Data.UnmarshalTo(&actual) diff --git a/internal/catalog/catalogtest/test_lifecycle_v1alpha1.go b/internal/catalog/catalogtest/test_lifecycle_v1alpha1.go new file mode 100644 index 000000000000..d7529a6ec48c --- /dev/null +++ b/internal/catalog/catalogtest/test_lifecycle_v1alpha1.go @@ -0,0 +1,706 @@ +package catalogtest + +import ( + "testing" + + "github.com/hashicorp/consul/internal/catalog" + rtest "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/sdk/testutil" +) + +// RunCatalogV1Alpha1LifecycleIntegrationTest intends to excercise functionality of +// managing catalog resources over their normal lifecycle where they will be modified +// several times, change state etc. +func RunCatalogV1Alpha1LifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient) { + t.Helper() + + testutil.RunStep(t, "node-lifecycle", func(t *testing.T) { + RunCatalogV1Alpha1NodeLifecycleIntegrationTest(t, client) + }) + + testutil.RunStep(t, "workload-lifecycle", func(t *testing.T) { + RunCatalogV1Alpha1WorkloadLifecycleIntegrationTest(t, client) + }) + + testutil.RunStep(t, "endpoints-lifecycle", func(t *testing.T) { + RunCatalogV1Alpha1EndpointsLifecycleIntegrationTest(t, client) + }) +} + +// RunCatalogV1Alpha1NodeLifecycleIntegrationTest verifies correct functionality of +// the node-health controller. This test will exercise the following behaviors: +// +// * Creating a Node without associated HealthStatuses will mark the node as passing +// * Associating a HealthStatus with a Node will cause recomputation of the Health +// * Changing HealthStatus to a worse health will cause recomputation of the Health +// * Changing HealthStatus to a better health will cause recomputation of the Health +// * Deletion of associated HealthStatuses will recompute the Health (back to passing) +// * Deletion of the node will cause deletion of associated health statuses +func RunCatalogV1Alpha1NodeLifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient) { + c := rtest.NewClient(client) + + nodeName := "test-lifecycle" + nodeHealthName := "test-lifecycle-node-status" + + // initial node creation + node := rtest.Resource(catalog.NodeV1Alpha1Type, nodeName). + WithData(t, &pbcatalog.Node{ + Addresses: []*pbcatalog.NodeAddress{ + {Host: "172.16.2.3"}, + {Host: "198.18.2.3", External: true}, + }, + }). + Write(t, c) + + // wait for the node health controller to mark the node as healthy + c.WaitForStatusCondition(t, node.Id, + catalog.NodeHealthStatusKey, + catalog.NodeHealthConditions[pbcatalog.Health_HEALTH_PASSING]) + + // Its easy enough to simply repeatedly set the health status and it proves + // that going both from better to worse health and worse to better all + // happen as expected. We leave the health in a warning state to allow for + // the subsequent health status deletion to cause the health to go back + // to passing. + healthChanges := []pbcatalog.Health{ + pbcatalog.Health_HEALTH_PASSING, + pbcatalog.Health_HEALTH_WARNING, + pbcatalog.Health_HEALTH_CRITICAL, + pbcatalog.Health_HEALTH_MAINTENANCE, + pbcatalog.Health_HEALTH_CRITICAL, + pbcatalog.Health_HEALTH_WARNING, + pbcatalog.Health_HEALTH_PASSING, + pbcatalog.Health_HEALTH_WARNING, + } + + // This will be set within the loop and used afterwards to delete the health status + var nodeHealth *pbresource.Resource + + // Iterate through the various desired health statuses, updating + // a HealthStatus resource owned by the node and waiting for + // reconciliation at each point + for _, health := range healthChanges { + // update the health check + nodeHealth = setHealthStatus(t, c, node.Id, nodeHealthName, health) + + // wait for reconciliation to kick in and put the node into the right + // health status. + c.WaitForStatusCondition(t, node.Id, + catalog.NodeHealthStatusKey, + catalog.NodeHealthConditions[health]) + } + + // now delete the health status and ensure things go back to passing + c.MustDelete(t, nodeHealth.Id) + + // wait for the node health controller to mark the node as healthy + c.WaitForStatusCondition(t, node.Id, + catalog.NodeHealthStatusKey, + catalog.NodeHealthConditions[pbcatalog.Health_HEALTH_PASSING]) + + // Add the health status back once more, the actual status doesn't matter. + // It just must be owned by the node so that we can show cascading + // deletions of owned health statuses working. + healthStatus := setHealthStatus(t, c, node.Id, nodeHealthName, pbcatalog.Health_HEALTH_CRITICAL) + + // Delete the node and wait for the health status to be deleted. + c.MustDelete(t, node.Id) + c.WaitForDeletion(t, healthStatus.Id) +} + +// RunCatalogV1Alpha1WorkloadLifecycleIntegrationTest verifies correct functionality of +// the workload-health controller. This test will exercise the following behaviors: +// +// - Associating a workload with a node causes recomputation of the health and takes +// into account the nodes health +// - Modifying the workloads associated node causes health recomputation and takes into +// account the new nodes health +// - Removal of the node association causes recomputation of health and for no node health +// to be taken into account. +// - Creating a workload without associated health statuses or node association will +// be marked passing +// - Creating a workload without associated health statuses but with a node will +// inherit its health from the node. +// - Changing HealthStatus to a worse health will cause recompuation of the Health +// - Changing HealthStatus to a better health will cause recompuation of the Health +// - Overall health is computed as the worst health amongst the nodes health and all +// of the workloads associated HealthStatuses +// - Deletion of the workload will cause deletion of all associated health statuses. +func RunCatalogV1Alpha1WorkloadLifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient) { + c := rtest.NewClient(client) + testutil.RunStep(t, "nodeless-workload", func(t *testing.T) { + runV1Alpha1NodelessWorkloadLifecycleIntegrationTest(t, c) + }) + + testutil.RunStep(t, "node-associated-workload", func(t *testing.T) { + runV1Alpha1NodeAssociatedWorkloadLifecycleIntegrationTest(t, c) + }) +} + +// runV1Alpha1NodelessWorkloadLifecycleIntegrationTest verifies correct functionality of +// the workload-health controller for workloads without node associations. In particular +// the following behaviors are being tested +// +// - Creating a workload without associated health statuses or node association will +// be marked passing +// - Changing HealthStatus to a worse health will cause recompuation of the Health +// - Changing HealthStatus to a better health will cause recompuation of the Health +// - Deletion of associated HealthStatus for a nodeless workload will be set back to passing +// - Deletion of the workload will cause deletion of all associated health statuses. +func runV1Alpha1NodelessWorkloadLifecycleIntegrationTest(t *testing.T, c *rtest.Client) { + workloadName := "test-lifecycle-workload" + workloadHealthName := "test-lifecycle-workload-status" + + // create a workload without a node association or health statuses yet + workload := rtest.Resource(catalog.WorkloadV1Alpha1Type, workloadName). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "198.18.9.8"}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + Identity: "test-lifecycle", + }). + Write(t, c) + + // wait for the workload health controller to mark the workload as healthy + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadHealthConditions[pbcatalog.Health_HEALTH_PASSING]) + + // We may not need to iterate through all of these states but its easy + // enough and quick enough to do so. The general rationale is that we + // should move through changing the workloads associated health status + // in this progression. We can prove that moving from better to worse + // health or worse to better both function correctly. + healthChanges := []pbcatalog.Health{ + pbcatalog.Health_HEALTH_PASSING, + pbcatalog.Health_HEALTH_WARNING, + pbcatalog.Health_HEALTH_CRITICAL, + pbcatalog.Health_HEALTH_MAINTENANCE, + pbcatalog.Health_HEALTH_CRITICAL, + pbcatalog.Health_HEALTH_WARNING, + pbcatalog.Health_HEALTH_PASSING, + pbcatalog.Health_HEALTH_WARNING, + } + + var workloadHealth *pbresource.Resource + // Iterate through the various desired health statuses, updating + // a HealthStatus resource owned by the workload and waiting for + // reconciliation at each point + for _, health := range healthChanges { + // update the health status + workloadHealth = setHealthStatus(t, c, workload.Id, workloadHealthName, health) + + // wait for reconciliation to kick in and put the workload into + // the right health status. + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadHealthConditions[health]) + } + + // Now delete the health status, things should go back to passing status + c.MustDelete(t, workloadHealth.Id) + + // ensure the workloads health went back to passing + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadHealthConditions[pbcatalog.Health_HEALTH_PASSING]) + + // Reset the workload health. The actual health is irrelevant, we just want it + // to exist to provde that Health Statuses get deleted along with the workload + // when its deleted. + workloadHealth = setHealthStatus(t, c, workload.Id, workloadHealthName, pbcatalog.Health_HEALTH_WARNING) + + // Delete the workload and wait for the HealthStatus to also be deleted + c.MustDelete(t, workload.Id) + c.WaitForDeletion(t, workloadHealth.Id) +} + +// runV1Alpha1NodeAssociatedWorkloadLifecycleIntegrationTest verifies correct functionality of +// the workload-health controller. This test will exercise the following behaviors: +// +// - Associating a workload with a node causes recomputation of the health and takes +// into account the nodes health +// - Modifying the workloads associated node causes health recomputation and takes into +// account the new nodes health +// - Removal of the node association causes recomputation of health and for no node health +// to be taken into account. +// - Creating a workload without associated health statuses but with a node will +// inherit its health from the node. +// - Overall health is computed as the worst health amongst the nodes health and all +// of the workloads associated HealthStatuses +func runV1Alpha1NodeAssociatedWorkloadLifecycleIntegrationTest(t *testing.T, c *rtest.Client) { + workloadName := "test-lifecycle" + workloadHealthName := "test-lifecycle" + nodeName1 := "test-lifecycle-1" + nodeName2 := "test-lifecycle-2" + nodeHealthName1 := "test-lifecycle-node-1" + nodeHealthName2 := "test-lifecycle-node-2" + + // Insert a some nodes to link the workloads to at various points throughout the test + node1 := rtest.Resource(catalog.NodeV1Alpha1Type, nodeName1). + WithData(t, &pbcatalog.Node{ + Addresses: []*pbcatalog.NodeAddress{{Host: "172.17.9.10"}}, + }). + Write(t, c) + node2 := rtest.Resource(catalog.NodeV1Alpha1Type, nodeName2). + WithData(t, &pbcatalog.Node{ + Addresses: []*pbcatalog.NodeAddress{{Host: "172.17.9.11"}}, + }). + Write(t, c) + + // Set some non-passing health statuses for those nodes. Using non-passing will make + // it easy to see that changing a passing workloads node association appropriately + // impacts the overall workload health. + setHealthStatus(t, c, node1.Id, nodeHealthName1, pbcatalog.Health_HEALTH_CRITICAL) + setHealthStatus(t, c, node2.Id, nodeHealthName2, pbcatalog.Health_HEALTH_WARNING) + + // Add the workload but don't immediately associate with any node. + workload := rtest.Resource(catalog.WorkloadV1Alpha1Type, workloadName). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "198.18.9.8"}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + Identity: "test-lifecycle", + }). + Write(t, c) + + // wait for the workload health controller to mark the workload as healthy + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadHealthConditions[pbcatalog.Health_HEALTH_PASSING]) + + // now modify the workload to associate it with node 1 (currently with CRITICAL health) + workload = rtest.ResourceID(workload.Id). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "198.18.9.8"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}}, + Identity: "test-lifecycle", + // this is the only difference from the previous write + NodeName: node1.Id.Name, + }). + Write(t, c) + + // wait for the workload health controller to mark the workload as critical (due to node 1 having critical health) + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadAndNodeHealthConditions[pbcatalog.Health_HEALTH_PASSING][pbcatalog.Health_HEALTH_CRITICAL]) + + // Now reassociate the workload with node 2. This should cause recalculation of its health into the warning state + workload = rtest.ResourceID(workload.Id). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "198.18.9.8"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}}, + Identity: "test-lifecycle", + // this is the only difference from the previous write + NodeName: node2.Id.Name, + }). + Write(t, c) + + // Wait for the workload health controller to mark the workload as warning (due to node 2 having warning health) + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadAndNodeHealthConditions[pbcatalog.Health_HEALTH_PASSING][pbcatalog.Health_HEALTH_WARNING]) + + // Delete the node, this should cause the health to be recalculated as critical because the node association + // is broken. + c.MustDelete(t, node2.Id) + + // Wait for the workload health controller to mark the workload as critical due to the missing node + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadAndNodeHealthConditions[pbcatalog.Health_HEALTH_PASSING][pbcatalog.Health_HEALTH_CRITICAL]) + + // Now fixup the node association to point at node 1 + workload = rtest.ResourceID(workload.Id). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "198.18.9.8"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}}, + Identity: "test-lifecycle", + // this is the only difference from the previous write + NodeName: node1.Id.Name, + }). + Write(t, c) + + // Also set node 1 health down to WARNING + setHealthStatus(t, c, node1.Id, nodeHealthName1, pbcatalog.Health_HEALTH_WARNING) + + // Wait for the workload health controller to mark the workload as warning (due to node 1 having warning health now) + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadAndNodeHealthConditions[pbcatalog.Health_HEALTH_PASSING][pbcatalog.Health_HEALTH_WARNING]) + + // Now add a critical workload health check to ensure that both node and workload health are accounted for. + setHealthStatus(t, c, workload.Id, workloadHealthName, pbcatalog.Health_HEALTH_CRITICAL) + + // Wait for the workload health to be recomputed and put into the critical status. + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadAndNodeHealthConditions[pbcatalog.Health_HEALTH_CRITICAL][pbcatalog.Health_HEALTH_WARNING]) + + // Reset the workloads health to passing. We expect the overall health to go back to warning + setHealthStatus(t, c, workload.Id, workloadHealthName, pbcatalog.Health_HEALTH_PASSING) + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadAndNodeHealthConditions[pbcatalog.Health_HEALTH_PASSING][pbcatalog.Health_HEALTH_WARNING]) + + // Remove the node association and wait for the health to go back to passing + workload = rtest.ResourceID(workload.Id). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "198.18.9.8"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}}, + Identity: "test-lifecycle", + }). + Write(t, c) + c.WaitForStatusCondition(t, workload.Id, + catalog.WorkloadHealthStatusKey, + catalog.WorkloadHealthConditions[pbcatalog.Health_HEALTH_PASSING]) +} + +// RunCatalogV1Alpha1EndpointsLifecycleIntegrationTest verifies the correct functionality of +// the endpoints controller. This test will exercise the following behaviors: +// +// * Services without a selector get marked with status indicating their endpoints are unmanaged +// * Services with a selector get marked with status indicating their endpoints are managed +// * Deleting a service will delete the associated endpoints (regardless of them being managed or not) +// * Moving from managed to unmanaged endpoints will delete the managed endpoints +// * Moving from unmanaged to managed endpoints will overwrite any previous endpoints. +// * A service with a selector that matches no workloads will still have the endpoints object written. +// * Adding ports to a service will recalculate the endpoints +// * Removing ports from a service will recalculate the endpoints +// * Changing the workload will recalculate the endpoints (ports, addresses, or health) +func RunCatalogV1Alpha1EndpointsLifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient) { + c := rtest.NewClient(client) + serviceName := "test-lifecycle" + + // Create the service without a selector. We should not see endpoints generated but we should see the + // status updated to note endpoints are not being managed. + service := rtest.Resource(catalog.ServiceV1Alpha1Type, serviceName). + WithData(t, &pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}}, + }). + Write(t, c) + + // Wait to ensure the status is updated accordingly + c.WaitForStatusCondition(t, service.Id, catalog.EndpointsStatusKey, catalog.EndpointsStatusConditionUnmanaged) + + // Verify that no endpoints were created. + endpointsID := rtest.Resource(catalog.ServiceEndpointsV1Alpha1Type, serviceName).ID() + c.RequireResourceNotFound(t, endpointsID) + + // Add some empty endpoints (type validations enforce that they are owned by the service) + rtest.ResourceID(endpointsID). + WithData(t, &pbcatalog.ServiceEndpoints{}). + WithOwner(service.Id). + Write(t, c) + + // Now delete the service and ensure that they are cleaned up. + c.MustDelete(t, service.Id) + c.WaitForDeletion(t, endpointsID) + + // Add some workloads to eventually select by the service + + // api-1 has all ports (http, grpc and mesh). It also has a mixture of Addresses + // that select individual ports and one that selects all ports implicitly + api1 := rtest.Resource(catalog.WorkloadV1Alpha1Type, "api-1"). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1"}, + {Host: "::1", Ports: []string{"grpc"}}, + {Host: "127.0.0.2", Ports: []string{"http"}}, + {Host: "172.17.1.1", Ports: []string{"mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": {Port: 10000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + "grpc": {Port: 9090, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + Identity: "api", + }). + Write(t, c) + + // api-2 has only grpc and mesh ports. It also has a mixture of Addresses that + // select individual ports and one that selects all ports implicitly + api2 := rtest.Resource(catalog.WorkloadV1Alpha1Type, "api-2"). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1"}, + {Host: "::1", Ports: []string{"grpc"}}, + {Host: "172.17.1.2", Ports: []string{"mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": {Port: 10000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + "grpc": {Port: 9090, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + Identity: "api", + }). + Write(t, c) + + // api-3 has the mesh and HTTP ports. It also has a mixture of Addresses that + // select individual ports and one that selects all ports. + api3 := rtest.Resource(catalog.WorkloadV1Alpha1Type, "api-3"). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1"}, + {Host: "172.17.1.3", Ports: []string{"mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": {Port: 10000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + Identity: "api", + }). + Write(t, c) + + // Now create a service with unmanaged endpoints again + service = rtest.Resource(catalog.ServiceV1Alpha1Type, serviceName). + WithData(t, &pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}}, + }). + Write(t, c) + + // Inject the endpoints resource. We want to prove that transition from unmanaged to + // managed endpoints results in overwriting of the old endpoints + rtest.ResourceID(endpointsID). + WithData(t, &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "198.18.1.1", External: true}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": {Port: 443, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + }, + }, + }). + WithOwner(service.Id). + Write(t, c) + + // Wait to ensure the status is updated accordingly + c.WaitForStatusCondition(t, service.Id, catalog.EndpointsStatusKey, catalog.EndpointsStatusConditionUnmanaged) + + // Now move the service to having managed endpoints + service = rtest.ResourceID(service.Id). + WithData(t, &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{Names: []string{"bar"}}, + Ports: []*pbcatalog.ServicePort{{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}}, + }). + Write(t, c) + + // Verify that this status is updated to show this service as having managed endpoints + c.WaitForStatusCondition(t, service.Id, catalog.EndpointsStatusKey, catalog.EndpointsStatusConditionManaged) + + // Verify that the service endpoints are created. In this case they will be empty + verifyServiceEndpoints(t, c, endpointsID, &pbcatalog.ServiceEndpoints{}) + + // Rewrite the service to select the API workloads - just select the singular port for now + service = rtest.ResourceID(service.Id). + WithData(t, &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{Prefixes: []string{"api-"}}, + Ports: []*pbcatalog.ServicePort{{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}}, + }). + Write(t, c) + + // Wait for the status to be updated. The condition itself will remain unchanged but we are waiting for + // the generations to match to know that the endpoints would have been regenerated + c.WaitForStatusCondition(t, service.Id, catalog.EndpointsStatusKey, catalog.EndpointsStatusConditionManaged) + + // ensure that api-1 and api-3 are selected but api-2 is excluded due to not having the desired port + verifyServiceEndpoints(t, c, endpointsID, &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: api1.Id, + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{"http"}}, + {Host: "127.0.0.2", Ports: []string{"http"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + }, + { + TargetRef: api3.Id, + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{"http"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + }, + }, + }) + + // Rewrite the service to select the API workloads - changing from selecting the HTTP port to the gRPC port + service = rtest.ResourceID(service.Id). + WithData(t, &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{Prefixes: []string{"api-"}}, + Ports: []*pbcatalog.ServicePort{{TargetPort: "grpc", Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}}, + }). + Write(t, c) + + // Wait for the status to be updated. The condition itself will remain unchanged but we are waiting for + // the generations to match to know that the endpoints would have been regenerated + c.WaitForStatusCondition(t, service.Id, catalog.EndpointsStatusKey, catalog.EndpointsStatusConditionManaged) + + // Check that the endpoints were generated as expected + verifyServiceEndpoints(t, c, endpointsID, &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: api1.Id, + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{"grpc"}}, + {Host: "::1", Ports: []string{"grpc"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "grpc": {Port: 9090, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + }, + { + TargetRef: api2.Id, + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{"grpc"}}, + {Host: "::1", Ports: []string{"grpc"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "grpc": {Port: 9090, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + }, + }, + }) + + // Update the service to change the ports used. This should result in the workload being removed + // from the endpoints + rtest.ResourceID(api2.Id). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1"}, + {Host: "::1", Ports: []string{"http"}}, + {Host: "172.17.1.2", Ports: []string{"mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": {Port: 10000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + Identity: "api", + }). + Write(t, c) + + // Verify that api-2 was removed from the service endpoints as it no longer has a grpc port + verifyServiceEndpoints(t, c, endpointsID, &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: api1.Id, + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{"grpc"}}, + {Host: "::1", Ports: []string{"grpc"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "grpc": {Port: 9090, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + }, + }, + }) + + // Remove the ::1 address from workload api1 which should result in recomputing endpoints + rtest.ResourceID(api1.Id). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1"}, + {Host: "172.17.1.1", Ports: []string{"mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": {Port: 10000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + "grpc": {Port: 9090, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + Identity: "api", + }). + Write(t, c) + + // Verify that api-1 had its addresses modified appropriately + verifyServiceEndpoints(t, c, endpointsID, &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: api1.Id, + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{"grpc"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "grpc": {Port: 9090, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + HealthStatus: pbcatalog.Health_HEALTH_PASSING, + }, + }, + }) + + // Add a failing health status to the api1 workload to force recomputation of endpoints + setHealthStatus(t, c, api1.Id, "api-failed", pbcatalog.Health_HEALTH_CRITICAL) + + // Verify that api-1 within the endpoints has the expected health + verifyServiceEndpoints(t, c, endpointsID, &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: api1.Id, + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{"grpc"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "grpc": {Port: 9090, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, + }, + HealthStatus: pbcatalog.Health_HEALTH_CRITICAL, + }, + }, + }) + + // Move the service to being unmanaged. We should see the ServiceEndpoints being removed. + service = rtest.ResourceID(service.Id). + WithData(t, &pbcatalog.Service{ + Ports: []*pbcatalog.ServicePort{{TargetPort: "grpc", Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}}, + }). + Write(t, c) + + // Wait for the endpoints controller to inform us that the endpoints are not being managed + c.WaitForStatusCondition(t, service.Id, catalog.EndpointsStatusKey, catalog.EndpointsStatusConditionUnmanaged) + // Ensure that the managed endpoints were deleted + c.WaitForDeletion(t, endpointsID) + + // Put the service back into managed mode. + service = rtest.ResourceID(service.Id). + WithData(t, &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{Prefixes: []string{"api-"}}, + Ports: []*pbcatalog.ServicePort{{TargetPort: "grpc", Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}}, + }). + Write(t, c) + + // Wait for the service endpoints to be regenerated + c.WaitForStatusCondition(t, service.Id, catalog.EndpointsStatusKey, catalog.EndpointsStatusConditionManaged) + c.RequireResourceExists(t, endpointsID) + + // Now delete the service and ensure that the endpoints eventually are deleted as well + c.MustDelete(t, service.Id) + c.WaitForDeletion(t, endpointsID) + +} + +func setHealthStatus(t *testing.T, client *rtest.Client, owner *pbresource.ID, name string, health pbcatalog.Health) *pbresource.Resource { + return rtest.Resource(catalog.HealthStatusV1Alpha1Type, name). + WithData(t, &pbcatalog.HealthStatus{ + Type: "synthetic", + Status: health, + }). + WithOwner(owner). + Write(t, client) +} diff --git a/internal/catalog/exports.go b/internal/catalog/exports.go index 61247091be1c..e0373bf7079b 100644 --- a/internal/catalog/exports.go +++ b/internal/catalog/exports.go @@ -5,6 +5,9 @@ package catalog import ( "github.com/hashicorp/consul/internal/catalog/internal/controllers" + "github.com/hashicorp/consul/internal/catalog/internal/controllers/endpoints" + "github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth" + "github.com/hashicorp/consul/internal/catalog/internal/controllers/workloadhealth" "github.com/hashicorp/consul/internal/catalog/internal/mappers/nodemapper" "github.com/hashicorp/consul/internal/catalog/internal/mappers/selectiontracker" "github.com/hashicorp/consul/internal/catalog/internal/types" @@ -40,6 +43,21 @@ var ( HealthStatusV1Alpha1Type = types.HealthStatusV1Alpha1Type HealthChecksV1Alpha1Type = types.HealthChecksV1Alpha1Type DNSPolicyV1Alpha1Type = types.DNSPolicyV1Alpha1Type + + // Controller Statuses + NodeHealthStatusKey = nodehealth.StatusKey + NodeHealthStatusConditionHealthy = nodehealth.StatusConditionHealthy + NodeHealthConditions = nodehealth.Conditions + + WorkloadHealthStatusKey = workloadhealth.StatusKey + WorkloadHealthStatusConditionHealthy = workloadhealth.StatusConditionHealthy + WorkloadHealthConditions = workloadhealth.WorkloadConditions + WorkloadAndNodeHealthConditions = workloadhealth.NodeAndWorkloadConditions + + EndpointsStatusKey = endpoints.StatusKey + EndpointsStatusConditionEndpointsManaged = endpoints.StatusConditionEndpointsManaged + EndpointsStatusConditionManaged = endpoints.ConditionManaged + EndpointsStatusConditionUnmanaged = endpoints.ConditionUnmanaged ) // RegisterTypes adds all resource types within the "catalog" API group diff --git a/internal/resource/resourcetest/builder.go b/internal/resource/resourcetest/builder.go index 7355f38824ec..8e66028e893b 100644 --- a/internal/resource/resourcetest/builder.go +++ b/internal/resource/resourcetest/builder.go @@ -3,9 +3,12 @@ package resourcetest import ( "context" + "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/proto-public/pbresource" "github.com/oklog/ulid/v2" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/known/anypb" @@ -37,6 +40,14 @@ func Resource(rtype *pbresource.Type, name string) *resourceBuilder { } } +func ResourceID(id *pbresource.ID) *resourceBuilder { + return &resourceBuilder{ + resource: &pbresource.Resource{ + Id: id, + }, + } +} + func (b *resourceBuilder) WithData(t T, data protoreflect.ProtoMessage) *resourceBuilder { t.Helper() @@ -123,7 +134,11 @@ func (b *resourceBuilder) Write(t T, client pbresource.ResourceServiceClient) *p _, err := client.Delete(context.Background(), &pbresource.DeleteRequest{ Id: rsp.Resource.Id, }) - require.NoError(t, err) + + // ignore not found errors + if err != nil && status.Code(err) != codes.NotFound { + t.Fatalf("Failed to delete resource %s of type %s: %v", rsp.Resource.Id.Name, resource.ToGVK(rsp.Resource.Id.Type), err) + } }) } diff --git a/internal/resource/resourcetest/client.go b/internal/resource/resourcetest/client.go index dab5b03c3adb..d29ccda4199d 100644 --- a/internal/resource/resourcetest/client.go +++ b/internal/resource/resourcetest/client.go @@ -35,6 +35,7 @@ func (client *Client) SetRetryerConfig(timeout time.Duration, wait time.Duration } func (client *Client) retry(t T, fn func(r *retry.R)) { + t.Helper() retryer := &retry.Timer{Timeout: client.timeout, Wait: client.wait} retry.RunWith(retryer, t, fn) } @@ -181,7 +182,7 @@ func (client *Client) WaitForStatusCondition(t T, id *pbresource.ID, statusKey s var res *pbresource.Resource client.retry(t, func(r *retry.R) { - res = client.RequireStatusConditionForCurrentGen(t, id, statusKey, condition) + res = client.RequireStatusConditionForCurrentGen(r, id, statusKey, condition) }) return res @@ -209,6 +210,14 @@ func (client *Client) WaitForResourceState(t T, id *pbresource.ID, verify func(T return res } +func (client *Client) WaitForDeletion(t T, id *pbresource.ID) { + t.Helper() + + client.retry(t, func(r *retry.R) { + client.RequireResourceNotFound(r, id) + }) +} + // ResolveResourceID will read the specified resource and returns its full ID. // This is mainly useful to get the ID with the Uid filled out. func (client *Client) ResolveResourceID(t T, id *pbresource.ID) *pbresource.ID { @@ -216,3 +225,12 @@ func (client *Client) ResolveResourceID(t T, id *pbresource.ID) *pbresource.ID { return client.RequireResourceExists(t, id).Id } + +func (client *Client) MustDelete(t T, id *pbresource.ID) { + _, err := client.Delete(context.Background(), &pbresource.DeleteRequest{Id: id}) + if status.Code(err) == codes.NotFound { + return + } + + require.NoError(t, err) +} From 77a78ef5f7d8049ba7756bb508a2f99c96c47d04 Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Wed, 14 Jun 2023 13:10:46 -0400 Subject: [PATCH 2/2] Prevent triggering the race detector. This allows defining some variables for protobuf constants and using those in comparisons. Without that, something internal in the fmt package ended up looking at the protobuf message size cache and triggering the race detector. --- proto/private/prototest/testing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proto/private/prototest/testing.go b/proto/private/prototest/testing.go index 28341012afa6..b423478155d1 100644 --- a/proto/private/prototest/testing.go +++ b/proto/private/prototest/testing.go @@ -100,5 +100,5 @@ func AssertContainsElement[V any](t TestingT, list []V, element V, opts ...cmp.O } } - t.Fatalf("assertion failed: list does not contain element\n--- list\n%#v\n--- element: %#v", list, element) + t.Fatalf("assertion failed: list does not contain element\n--- list\n%+v\n--- element: %+v", list, element) }