Skip to content

Commit

Permalink
Implement data filtering of some endpoints (#5579)
Browse files Browse the repository at this point in the history
Fixes: #4222 

# Data Filtering

This PR will implement filtering for the following endpoints:

## Supported HTTP Endpoints

- `/agent/checks`
- `/agent/services`
- `/catalog/nodes`
- `/catalog/service/:service`
- `/catalog/connect/:service`
- `/catalog/node/:node`
- `/health/node/:node`
- `/health/checks/:service`
- `/health/service/:service`
- `/health/connect/:service`
- `/health/state/:state`
- `/internal/ui/nodes`
- `/internal/ui/services`

More can be added going forward and any endpoint which is used to list some data is a good candidate.

## Usage

When using the HTTP API a `filter` query parameter can be used to pass a filter expression to Consul. Filter Expressions take the general form of:

```
<selector> == <value>
<selector> != <value>
<value> in <selector>
<value> not in <selector>
<selector> contains <value>
<selector> not contains <value>
<selector> is empty
<selector> is not empty
not <other expression>
<expression 1> and <expression 2>
<expression 1> or <expression 2>
```

Normal boolean logic and precedence is supported. All of the actual filtering and evaluation logic is coming from the [go-bexpr](https://github.com/hashicorp/go-bexpr) library

## Other changes

Adding the `Internal.ServiceDump` RPC endpoint. This will allow the UI to filter services better.
  • Loading branch information
mkeeler authored Apr 16, 2019
1 parent ffc5c33 commit afa1cc9
Show file tree
Hide file tree
Showing 71 changed files with 8,808 additions and 509 deletions.
20 changes: 18 additions & 2 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/hashicorp/consul/lib/file"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/logutils"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
Expand Down Expand Up @@ -219,6 +220,9 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
var token string
s.parseToken(req, &token)

var filterExpression string
s.parseFilter(req, &filterExpression)

services := s.agent.State.Services()
if err := s.agent.filterServices(token, &services); err != nil {
return nil, err
Expand All @@ -238,7 +242,12 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
agentSvcs[id] = &agentService
}

return agentSvcs, nil
filter, err := bexpr.CreateFilter(filterExpression, nil, agentSvcs)
if err != nil {
return nil, err
}

return filter.Execute(agentSvcs)
}

// GET /v1/agent/service/:service_id
Expand Down Expand Up @@ -403,6 +412,13 @@ func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (i
var token string
s.parseToken(req, &token)

var filterExpression string
s.parseFilter(req, &filterExpression)
filter, err := bexpr.CreateFilter(filterExpression, nil, nil)
if err != nil {
return nil, err
}

checks := s.agent.State.Checks()
if err := s.agent.filterChecks(token, &checks); err != nil {
return nil, err
Expand All @@ -417,7 +433,7 @@ func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (i
}
}

return checks, nil
return filter.Execute(checks)
}

func (s *HTTPServer) AgentMembers(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Expand Down
76 changes: 75 additions & 1 deletion agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"strings"
Expand All @@ -27,8 +28,8 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/serf/serf"
Expand Down Expand Up @@ -101,6 +102,48 @@ func TestAgent_Services(t *testing.T) {
assert.Equal(t, prxy1.Upstreams.ToAPI(), val["mysql"].Connect.Proxy.Upstreams)
}

func TestAgent_ServicesFiltered(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()

testrpc.WaitForTestAgent(t, a.RPC, "dc1")
srv1 := &structs.NodeService{
ID: "mysql",
Service: "mysql",
Tags: []string{"master"},
Meta: map[string]string{
"foo": "bar",
},
Port: 5000,
}
require.NoError(t, a.State.AddService(srv1, ""))

// Add another service
srv2 := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"kv"},
Meta: map[string]string{
"foo": "bar",
},
Port: 1234,
}
require.NoError(t, a.State.AddService(srv2, ""))

req, _ := http.NewRequest("GET", "/v1/agent/services?filter="+url.QueryEscape("foo in Meta"), nil)
obj, err := a.srv.AgentServices(nil, req)
require.NoError(t, err)
val := obj.(map[string]*api.AgentService)
require.Len(t, val, 2)

req, _ = http.NewRequest("GET", "/v1/agent/services?filter="+url.QueryEscape("kv in Tags"), nil)
obj, err = a.srv.AgentServices(nil, req)
require.NoError(t, err)
val = obj.(map[string]*api.AgentService)
require.Len(t, val, 1)
}

// This tests that the agent services endpoint (/v1/agent/services) returns
// Connect proxies.
func TestAgent_Services_ExternalConnectProxy(t *testing.T) {
Expand Down Expand Up @@ -629,6 +672,37 @@ func TestAgent_Checks(t *testing.T) {
}
}

func TestAgent_ChecksWithFilter(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()

testrpc.WaitForTestAgent(t, a.RPC, "dc1")
chk1 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mysql",
Name: "mysql",
Status: api.HealthPassing,
}
a.State.AddCheck(chk1, "")

chk2 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "redis",
Name: "redis",
Status: api.HealthPassing,
}
a.State.AddCheck(chk2, "")

req, _ := http.NewRequest("GET", "/v1/agent/checks?filter="+url.QueryEscape("Name == `redis`"), nil)
obj, err := a.srv.AgentChecks(nil, req)
require.NoError(t, err)
val := obj.(map[types.CheckID]*structs.HealthCheck)
require.Len(t, val, 1)
_, ok := val["redis"]
require.True(t, ok)
}

func TestAgent_HealthServiceByID(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
Expand Down
177 changes: 176 additions & 1 deletion agent/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/serf/coordinate"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -153,6 +154,42 @@ func TestCatalogNodes_MetaFilter(t *testing.T) {
}
}

func TestCatalogNodes_Filter(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

// Register a node with a meta field
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
},
}

var out struct{}
require.NoError(t, a.RPC("Catalog.Register", args, &out))

req, _ := http.NewRequest("GET", "/v1/catalog/nodes?filter="+url.QueryEscape("Meta.somekey == somevalue"), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogNodes(resp, req)
require.NoError(t, err)

// Verify an index is set
assertIndex(t, resp)

// Verify we only get the node with the correct meta field back
nodes := obj.(structs.Nodes)
require.Len(t, nodes, 1)

v, ok := nodes[0].Meta["somekey"]
require.True(t, ok)
require.Equal(t, v, "somevalue")
}

func TestCatalogNodes_WanTranslation(t *testing.T) {
t.Parallel()
a1 := NewTestAgent(t, t.Name(), `
Expand Down Expand Up @@ -651,6 +688,69 @@ func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) {
}
}

func TestCatalogServiceNodes_Filter(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()

queryPath := "/v1/catalog/service/api?filter=" + url.QueryEscape("ServiceMeta.somekey == somevalue")

// Make sure an empty list is returned, not a nil
{
req, _ := http.NewRequest("GET", queryPath, nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogServiceNodes(resp, req)
require.NoError(t, err)

assertIndex(t, resp)

nodes := obj.(structs.ServiceNodes)
require.Empty(t, nodes)
}

// Register node
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "api",
Meta: map[string]string{
"somekey": "somevalue",
},
},
}

var out struct{}
require.NoError(t, a.RPC("Catalog.Register", args, &out))

// Register a second service for the node
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "api2",
Service: "api",
Meta: map[string]string{
"somekey": "notvalue",
},
},
SkipNodeUpdate: true,
}

require.NoError(t, a.RPC("Catalog.Register", args, &out))

req, _ := http.NewRequest("GET", queryPath, nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogServiceNodes(resp, req)
require.NoError(t, err)
assertIndex(t, resp)

nodes := obj.(structs.ServiceNodes)
require.Len(t, nodes, 1)
}

func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
t.Parallel()
a1 := NewTestAgent(t, t.Name(), `
Expand Down Expand Up @@ -884,6 +984,44 @@ func TestCatalogConnectServiceNodes_good(t *testing.T) {
assert.Equal(args.Service.Proxy, nodes[0].ServiceProxy)
}

func TestCatalogConnectServiceNodes_Filter(t *testing.T) {
t.Parallel()

a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

// Register
args := structs.TestRegisterRequestProxy(t)
args.Service.Address = "127.0.0.55"
var out struct{}
require.NoError(t, a.RPC("Catalog.Register", args, &out))

args = structs.TestRegisterRequestProxy(t)
args.Service.Address = "127.0.0.55"
args.Service.Meta = map[string]string{
"version": "2",
}
args.Service.ID = "web-proxy2"
args.SkipNodeUpdate = true
require.NoError(t, a.RPC("Catalog.Register", args, &out))

req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/catalog/connect/%s?filter=%s",
args.Service.Proxy.DestinationServiceName,
url.QueryEscape("ServiceMeta.version == 2")), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogConnectServiceNodes(resp, req)
require.NoError(t, err)
assertIndex(t, resp)

nodes := obj.(structs.ServiceNodes)
require.Len(t, nodes, 1)
require.Equal(t, structs.ServiceKindConnectProxy, nodes[0].ServiceKind)
require.Equal(t, args.Service.Address, nodes[0].ServiceAddress)
require.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy)
}

func TestCatalogNodeServices(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
Expand Down Expand Up @@ -927,6 +1065,43 @@ func TestCatalogNodeServices(t *testing.T) {
require.Equal(t, args.Service.Proxy, services.Services["web-proxy"].Proxy)
}

func TestCatalogNodeServices_Filter(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

// Register node with a regular service and connect proxy
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "api",
Tags: []string{"a"},
},
}

var out struct{}
require.NoError(t, a.RPC("Catalog.Register", args, &out))

// Register a connect proxy
args.Service = structs.TestNodeServiceProxy(t)
require.NoError(t, a.RPC("Catalog.Register", args, &out))

req, _ := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc1&filter="+url.QueryEscape("Kind == `connect-proxy`"), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogNodeServices(resp, req)
require.NoError(t, err)
assertIndex(t, resp)

services := obj.(*structs.NodeServices)
require.Len(t, services.Services, 1)

// Proxy service should have it's config intact
require.Equal(t, args.Service.Proxy, services.Services["web-proxy"].Proxy)
}

// Test that the services on a node contain all the Connect proxies on
// the node as well with their fields properly populated.
func TestCatalogNodeServices_ConnectProxy(t *testing.T) {
Expand Down
Loading

0 comments on commit afa1cc9

Please sign in to comment.