Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: generalise event path using CEL
Browse files Browse the repository at this point in the history
Allow users to handle events where multiple events are provided via single JSON
objects in an array field. This is enabled by providing an optional CEL program
with only minimal support for CEL evaluations; no extensions from the mito lib
extensions are made available.
  • Loading branch information
efd6 committed Mar 6, 2024
1 parent 85e4e46 commit 7ffb35a
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 12 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Add ETW input. {pull}36915[36915]
- Update CEL mito extensions to v1.9.0 to add keys/values helper. {pull}37971[37971]
- Add logging for cache processor file reads and writes. {pull}38052[38052]
- Add parseDateInTZ value template for the HTTPJSON input {pull}37738[37738]
- Add parseDateInTZ value template for the HTTPJSON input. {pull}37738[37738]
- Add support for complex event objects in the HTTP Endpoint input. {issue}37910[37910] {pull}38193[38193]

*Auditbeat*

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ The prefix for the signature. Certain webhooks prefix the HMAC signature with a
By default the input expects the incoming POST to include a Content-Type of `application/json` to try to enforce the incoming data to be valid JSON.
In certain scenarios when the source of the request is not able to do that, it can be overwritten with another value or set to null.

[float]
==== `program`

The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be treated handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a CEL program can be provided through this configuration field. No CEL extensions are provided beyond the function in the CEL standard library. CEL optional types are supported.

[float]
==== `response_code`

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type config struct {
URL string `config:"url" validate:"required"`
Prefix string `config:"prefix"`
ContentType string `config:"content_type"`
Program string `config:"program"`
SecretHeader string `config:"secret.header"`
SecretValue string `config:"secret.value"`
HMACHeader string `config:"hmac.header"`
Expand Down
117 changes: 111 additions & 6 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ import (
"io"
"net"
"net/http"
"reflect"
"time"

"github.com/google/cel-go/cel"
"github.com/google/cel-go/checker/decls"
"github.com/google/cel-go/common/types"
"github.com/google/cel-go/common/types/ref"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/structpb"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -24,6 +30,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/mito/lib"
)

const headerContentEncoding = "Content-Encoding"
Expand All @@ -43,6 +50,7 @@ type handler struct {
reqLogger *zap.Logger
host, scheme string

program *program
messageField string
responseCode int
responseBody string
Expand Down Expand Up @@ -80,7 +88,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = io.NopCloser(&buf)
}

objs, _, status, err := httpReadJSON(body)
objs, _, status, err := httpReadJSON(body, h.program)
if err != nil {
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
Expand Down Expand Up @@ -218,32 +226,45 @@ func (h *handler) publishEvent(obj, headers mapstr.M) error {
return nil
}

func httpReadJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, status int, err error) {
func httpReadJSON(body io.Reader, prg *program) (objs []mapstr.M, rawMessages []json.RawMessage, status int, err error) {
if body == http.NoBody {
return nil, nil, http.StatusNotAcceptable, errBodyEmpty
}
obj, rawMessage, err := decodeJSON(body)
obj, rawMessage, err := decodeJSON(body, prg)
if err != nil {
return nil, nil, http.StatusBadRequest, err
}
return obj, rawMessage, http.StatusOK, err
}

func decodeJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, err error) {
func decodeJSON(body io.Reader, prg *program) (objs []mapstr.M, rawMessages []json.RawMessage, err error) {
decoder := json.NewDecoder(body)
for decoder.More() {
var raw json.RawMessage
if err := decoder.Decode(&raw); err != nil {
if err = decoder.Decode(&raw); err != nil {
if err == io.EOF { //nolint:errorlint // This will never be a wrapped error.
break
}
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
}

var obj interface{}
if err := newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil {
if err = newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil {
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
}

if prg != nil {
obj, err = prg.eval(obj)
if err != nil {
return nil, nil, err
}
// Re-marshal to ensure the raw bytes agree with the constructed object.
raw, err = json.Marshal(obj)
if err != nil {
return nil, nil, fmt.Errorf("failed to remarshal object: %w", err)
}
}

switch v := obj.(type) {
case map[string]interface{}:
objs = append(objs, v)
Expand All @@ -265,6 +286,90 @@ func decodeJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage,
return objs, rawMessages, nil
}

type program struct {
prg cel.Program
ast *cel.Ast
}

func newProgram(src string) (*program, error) {
if src == "" {
return nil, nil
}

registry, err := types.NewRegistry()
if err != nil {
return nil, fmt.Errorf("failed to create env: %v", err)

Check failure on line 301 in x-pack/filebeat/input/http_endpoint/handler.go

View workflow job for this annotation

GitHub Actions / lint (windows)

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}
env, err := cel.NewEnv(
cel.Declarations(decls.NewVar("obj", decls.Dyn)),
cel.OptionalTypes(cel.OptionalTypesVersion(lib.OptionalTypesVersion)),
cel.CustomTypeAdapter(&numberAdapter{registry}),
cel.CustomTypeProvider(registry),
)
if err != nil {
return nil, fmt.Errorf("failed to create env: %v", err)

Check failure on line 310 in x-pack/filebeat/input/http_endpoint/handler.go

View workflow job for this annotation

GitHub Actions / lint (windows)

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}

ast, iss := env.Compile(src)
if iss.Err() != nil {
return nil, fmt.Errorf("failed compilation: %v", iss.Err())

Check failure on line 315 in x-pack/filebeat/input/http_endpoint/handler.go

View workflow job for this annotation

GitHub Actions / lint (windows)

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}

prg, err := env.Program(ast)
if err != nil {
return nil, fmt.Errorf("failed program instantiation: %v", err)
}
return &program{prg: prg, ast: ast}, nil
}

var _ types.Adapter = (*numberAdapter)(nil)

type numberAdapter struct {
fallback types.Adapter
}

func (a *numberAdapter) NativeToValue(value any) ref.Val {
if n, ok := value.(json.Number); ok {
var errs []error
i, err := n.Int64()
if err == nil {
return types.Int(i)
}
errs = append(errs, err)
f, err := n.Float64()
if err == nil {
return types.Double(f)
}
errs = append(errs, err)
return types.NewErr("%v", errors.Join(errs...))
}
return a.fallback.NativeToValue(value)
}

func (p *program) eval(obj interface{}) (interface{}, error) {
out, _, err := p.prg.Eval(map[string]interface{}{"obj": obj})
if err != nil {
err = lib.DecoratedError{AST: p.ast, Err: err}
return nil, fmt.Errorf("failed eval: %w", err)
}

v, err := out.ConvertToNative(reflect.TypeOf((*structpb.Value)(nil)))
if err != nil {
return nil, fmt.Errorf("failed proto conversion: %w", err)
}
switch v := v.(type) {
case *structpb.Value:
return v.AsInterface(), nil
default:
// This should never happen.
return nil, fmt.Errorf("unexpected native conversion type: %T", v)
}
}

func errorMessage(msg string) map[string]interface{} {

Check failure on line 369 in x-pack/filebeat/input/http_endpoint/handler.go

View workflow job for this annotation

GitHub Actions / lint (windows)

func `errorMessage` is unused (unused)
return map[string]interface{}{"error": map[string]interface{}{"message": msg}}
}

func decodeJSONArray(raw *bytes.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, err error) {
dec := newJSONDecoder(raw)
token, err := dec.Token()
Expand Down
38 changes: 36 additions & 2 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func Test_httpReadJSON(t *testing.T) {
tests := []struct {
name string
body string
program string
wantObjs []mapstr.M
wantStatus int
wantErr bool
Expand Down Expand Up @@ -135,10 +136,43 @@ func Test_httpReadJSON(t *testing.T) {
},
wantStatus: http.StatusOK,
},
{
name: "kinesis",
body: `{
"requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f",
"timestamp": 1578090901599,
"records": [
{
"data": "aGVsbG8="
},
{
"data": "aGVsbG8gd29ybGQ="
}
]
}`,
program: `obj.records.map(r, {
"requestId": obj.requestId,
"timestamp": string(obj.timestamp), // leave timestamp in unix milli for ingest to handle.
"event": r,
})`,
wantRawMessage: []json.RawMessage{
[]byte(`{"event":{"data":"aGVsbG8="},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"aGVsbG8gd29ybGQ="},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
},
wantObjs: []mapstr.M{
{"event": map[string]any{"data": "aGVsbG8="}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "aGVsbG8gd29ybGQ="}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
},
wantStatus: http.StatusOK,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotObjs, rawMessages, gotStatus, err := httpReadJSON(strings.NewReader(tt.body))
prg, err := newProgram(tt.program)
if err != nil {
t.Fatalf("failed to compile program: %v", err)
}
gotObjs, rawMessages, gotStatus, err := httpReadJSON(strings.NewReader(tt.body), prg)
if (err != nil) != tt.wantErr {
t.Errorf("httpReadJSON() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -344,7 +378,7 @@ func Test_apiResponse(t *testing.T) {
pub := new(publisher)
metrics := newInputMetrics("")
defer metrics.Close()
apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), pub, logp.NewLogger("http_endpoint.test"), metrics)
apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub, logp.NewLogger("http_endpoint.test"), metrics)

// Execute handler.
respRec := httptest.NewRecorder()
Expand Down
15 changes: 12 additions & 3 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m
metrics.route.Set(u.Path)
metrics.isTLS.Set(e.tlsConfig != nil)

var prg *program
if e.config.Program != "" {
prg, err = newProgram(e.config.Program)
if err != nil {
return err
}
}

p.mu.Lock()
s, ok := p.servers[e.addr]
if ok {
Expand All @@ -149,7 +157,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m
return err
}
log.Infof("Adding %s end point to server on %s", pattern, e.addr)
s.mux.Handle(pattern, newHandler(s.ctx, e.config, pub, log, metrics))
s.mux.Handle(pattern, newHandler(s.ctx, e.config, prg, pub, log, metrics))
s.idOf[pattern] = ctx.ID
p.mu.Unlock()
<-s.ctx.Done()
Expand All @@ -165,7 +173,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m
srv: srv,
}
s.ctx, s.cancel = ctxtool.WithFunc(ctx.Cancelation, func() { srv.Close() })
mux.Handle(pattern, newHandler(s.ctx, e.config, pub, log, metrics))
mux.Handle(pattern, newHandler(s.ctx, e.config, prg, pub, log, metrics))
p.servers[e.addr] = s
p.mu.Unlock()

Expand Down Expand Up @@ -287,7 +295,7 @@ func (s *server) getErr() error {
return s.err
}

func newHandler(ctx context.Context, c config, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
func newHandler(ctx context.Context, c config, prg *program, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
h := &handler{
log: log,
publisher: pub,
Expand All @@ -305,6 +313,7 @@ func newHandler(ctx context.Context, c config, pub stateless.Publisher, log *log
hmacType: c.HMACType,
hmacPrefix: c.HMACPrefix,
},
program: prg,
messageField: c.Prefix,
responseCode: c.ResponseCode,
responseBody: c.ResponseBody,
Expand Down

0 comments on commit 7ffb35a

Please sign in to comment.