Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/http_endpoint: generalise event path using CEL #38193

Merged
merged 3 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Add logging for cache processor file reads and writes. {pull}38052[38052]
- Add parseDateInTZ value template for the HTTPJSON input {pull}37738[37738]
- Improve rate limit handling by HTTPJSON {issue}36207[36207] {pull}38161[38161] {pull}38237[38237]
- Add parseDateInTZ value template for the HTTPJSON input. {pull}37738[37738]
- Add support for complex event objects in the HTTP Endpoint input. {issue}37910[37910] {pull}38193[38193]

*Auditbeat*

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ The prefix for the signature. Certain webhooks prefix the HMAC signature with a
By default the input expects the incoming POST to include a Content-Type of `application/json` to try to enforce the incoming data to be valid JSON.
In certain scenarios when the source of the request is not able to do that, it can be overwritten with another value or set to null.

[float]
==== `program`

The normal operation of the input treats the body either as a single event when the body is an object, or as a set of events when the body is an array. If the body should be treated handled differently, for example a set of events in an array field of an object to be handled as a set of events, then a CEL program can be provided through this configuration field. No CEL extensions are provided beyond the function in the CEL standard library. CEL optional types are supported.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

[float]
==== `response_code`

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type config struct {
URL string `config:"url" validate:"required"`
Prefix string `config:"prefix"`
ContentType string `config:"content_type"`
Program string `config:"program"`
SecretHeader string `config:"secret.header"`
SecretValue string `config:"secret.value"`
HMACHeader string `config:"hmac.header"`
Expand Down
113 changes: 107 additions & 6 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ import (
"io"
"net"
"net/http"
"reflect"
"time"

"github.com/google/cel-go/cel"
"github.com/google/cel-go/checker/decls"
"github.com/google/cel-go/common/types"
"github.com/google/cel-go/common/types/ref"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/structpb"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -24,6 +30,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/mito/lib"
)

const headerContentEncoding = "Content-Encoding"
Expand All @@ -43,6 +50,7 @@ type handler struct {
reqLogger *zap.Logger
host, scheme string

program *program
messageField string
responseCode int
responseBody string
Expand Down Expand Up @@ -80,7 +88,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = io.NopCloser(&buf)
}

objs, _, status, err := httpReadJSON(body)
objs, _, status, err := httpReadJSON(body, h.program)
if err != nil {
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
Expand Down Expand Up @@ -218,32 +226,45 @@ func (h *handler) publishEvent(obj, headers mapstr.M) error {
return nil
}

func httpReadJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, status int, err error) {
func httpReadJSON(body io.Reader, prg *program) (objs []mapstr.M, rawMessages []json.RawMessage, status int, err error) {
if body == http.NoBody {
return nil, nil, http.StatusNotAcceptable, errBodyEmpty
}
obj, rawMessage, err := decodeJSON(body)
obj, rawMessage, err := decodeJSON(body, prg)
if err != nil {
return nil, nil, http.StatusBadRequest, err
}
return obj, rawMessage, http.StatusOK, err
}

func decodeJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, err error) {
func decodeJSON(body io.Reader, prg *program) (objs []mapstr.M, rawMessages []json.RawMessage, err error) {
decoder := json.NewDecoder(body)
for decoder.More() {
var raw json.RawMessage
if err := decoder.Decode(&raw); err != nil {
if err = decoder.Decode(&raw); err != nil {
if err == io.EOF { //nolint:errorlint // This will never be a wrapped error.
break
}
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
}

var obj interface{}
if err := newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil {
if err = newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil {
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
}

if prg != nil {
obj, err = prg.eval(obj)
if err != nil {
return nil, nil, err
}
// Re-marshal to ensure the raw bytes agree with the constructed object.
raw, err = json.Marshal(obj)
if err != nil {
return nil, nil, fmt.Errorf("failed to remarshal object: %w", err)
}
}

switch v := obj.(type) {
case map[string]interface{}:
objs = append(objs, v)
Expand All @@ -265,6 +286,86 @@ func decodeJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage,
return objs, rawMessages, nil
}

type program struct {
prg cel.Program
ast *cel.Ast
}

func newProgram(src string) (*program, error) {
if src == "" {
return nil, nil
}

registry, err := types.NewRegistry()
if err != nil {
return nil, fmt.Errorf("failed to create env: %w", err)
}
env, err := cel.NewEnv(
cel.Declarations(decls.NewVar("obj", decls.Dyn)),
cel.OptionalTypes(cel.OptionalTypesVersion(lib.OptionalTypesVersion)),
cel.CustomTypeAdapter(&numberAdapter{registry}),
cel.CustomTypeProvider(registry),
)
if err != nil {
return nil, fmt.Errorf("failed to create env: %w", err)
}

ast, iss := env.Compile(src)
if iss.Err() != nil {
return nil, fmt.Errorf("failed compilation: %w", iss.Err())
}

prg, err := env.Program(ast)
if err != nil {
return nil, fmt.Errorf("failed program instantiation: %w", err)
}
return &program{prg: prg, ast: ast}, nil
}

var _ types.Adapter = (*numberAdapter)(nil)

type numberAdapter struct {
fallback types.Adapter
}

func (a *numberAdapter) NativeToValue(value any) ref.Val {
if n, ok := value.(json.Number); ok {
var errs []error
i, err := n.Int64()
if err == nil {
return types.Int(i)
}
errs = append(errs, err)
f, err := n.Float64()
if err == nil {
return types.Double(f)
}
errs = append(errs, err)
return types.NewErr("%v", errors.Join(errs...))
}
return a.fallback.NativeToValue(value)
}

func (p *program) eval(obj interface{}) (interface{}, error) {
out, _, err := p.prg.Eval(map[string]interface{}{"obj": obj})
if err != nil {
err = lib.DecoratedError{AST: p.ast, Err: err}
return nil, fmt.Errorf("failed eval: %w", err)
}

v, err := out.ConvertToNative(reflect.TypeOf((*structpb.Value)(nil)))
if err != nil {
return nil, fmt.Errorf("failed proto conversion: %w", err)
}
switch v := v.(type) {
case *structpb.Value:
return v.AsInterface(), nil
default:
// This should never happen.
return nil, fmt.Errorf("unexpected native conversion type: %T", v)
}
}

func decodeJSONArray(raw *bytes.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, err error) {
dec := newJSONDecoder(raw)
token, err := dec.Token()
Expand Down
38 changes: 36 additions & 2 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func Test_httpReadJSON(t *testing.T) {
tests := []struct {
name string
body string
program string
wantObjs []mapstr.M
wantStatus int
wantErr bool
Expand Down Expand Up @@ -135,10 +136,43 @@ func Test_httpReadJSON(t *testing.T) {
},
wantStatus: http.StatusOK,
},
{
name: "kinesis",
body: `{
"requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f",
"timestamp": 1578090901599,
"records": [
{
"data": "aGVsbG8="
},
{
"data": "aGVsbG8gd29ybGQ="
}
]
}`,
program: `obj.records.map(r, {
"requestId": obj.requestId,
"timestamp": string(obj.timestamp), // leave timestamp in unix milli for ingest to handle.
"event": r,
})`,
wantRawMessage: []json.RawMessage{
[]byte(`{"event":{"data":"aGVsbG8="},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
[]byte(`{"event":{"data":"aGVsbG8gd29ybGQ="},"requestId":"ed4acda5-034f-9f42-bba1-f29aea6d7d8f","timestamp":"1578090901599"}`),
},
wantObjs: []mapstr.M{
{"event": map[string]any{"data": "aGVsbG8="}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
{"event": map[string]any{"data": "aGVsbG8gd29ybGQ="}, "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", "timestamp": "1578090901599"},
},
wantStatus: http.StatusOK,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotObjs, rawMessages, gotStatus, err := httpReadJSON(strings.NewReader(tt.body))
prg, err := newProgram(tt.program)
if err != nil {
t.Fatalf("failed to compile program: %v", err)
}
gotObjs, rawMessages, gotStatus, err := httpReadJSON(strings.NewReader(tt.body), prg)
if (err != nil) != tt.wantErr {
t.Errorf("httpReadJSON() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -344,7 +378,7 @@ func Test_apiResponse(t *testing.T) {
pub := new(publisher)
metrics := newInputMetrics("")
defer metrics.Close()
apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), pub, logp.NewLogger("http_endpoint.test"), metrics)
apiHandler := newHandler(ctx, tracerConfig(tc.name, tc.conf, *withTraces), nil, pub, logp.NewLogger("http_endpoint.test"), metrics)

// Execute handler.
respRec := httptest.NewRecorder()
Expand Down
15 changes: 12 additions & 3 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m
metrics.route.Set(u.Path)
metrics.isTLS.Set(e.tlsConfig != nil)

var prg *program
if e.config.Program != "" {
prg, err = newProgram(e.config.Program)
if err != nil {
return err
}
}

p.mu.Lock()
s, ok := p.servers[e.addr]
if ok {
Expand All @@ -149,7 +157,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m
return err
}
log.Infof("Adding %s end point to server on %s", pattern, e.addr)
s.mux.Handle(pattern, newHandler(s.ctx, e.config, pub, log, metrics))
s.mux.Handle(pattern, newHandler(s.ctx, e.config, prg, pub, log, metrics))
s.idOf[pattern] = ctx.ID
p.mu.Unlock()
<-s.ctx.Done()
Expand All @@ -165,7 +173,7 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, m
srv: srv,
}
s.ctx, s.cancel = ctxtool.WithFunc(ctx.Cancelation, func() { srv.Close() })
mux.Handle(pattern, newHandler(s.ctx, e.config, pub, log, metrics))
mux.Handle(pattern, newHandler(s.ctx, e.config, prg, pub, log, metrics))
p.servers[e.addr] = s
p.mu.Unlock()

Expand Down Expand Up @@ -287,7 +295,7 @@ func (s *server) getErr() error {
return s.err
}

func newHandler(ctx context.Context, c config, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
func newHandler(ctx context.Context, c config, prg *program, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
h := &handler{
log: log,
publisher: pub,
Expand All @@ -305,6 +313,7 @@ func newHandler(ctx context.Context, c config, pub stateless.Publisher, log *log
hmacType: c.HMACType,
hmacPrefix: c.HMACPrefix,
},
program: prg,
messageField: c.Prefix,
responseCode: c.ResponseCode,
responseBody: c.ResponseBody,
Expand Down
Loading