Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][httpjson] - Separation of global transform contexts and introduction of parent transform context within chains #33499

Merged
merged 11 commits into from
Nov 8, 2022
Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896]
- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]

*Auditbeat*

Expand Down
48 changes: 44 additions & 4 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ List of transforms to apply to the request before each execution.

Available transforms for request: [`append`, `delete`, `set`].

Can read state from: [`.first_response.*`,`.last_response.*`, `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`, `.body.*`].
Can read state from: [`.first_response.*`,`.last_response.*`, `.parent_last_response.*` `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`, `.body.*`].

Can write state to: [`body.*`, `header.*`, `url.*`].

Expand All @@ -547,6 +547,47 @@ filebeat.inputs:
value: '[[now (parseDuration "-1h")]]'
----

NOTE: The clause `.parent_last_response.` should only be used from within chain steps and when pagination exists at the root request level. If pagination
does not exist at the root level, please use the clause `.first_response.` to access parent response object from within chains. You can look at this
<<parent-last-response,example>> below for a better idea.


["source","yaml",subs="attributes",id="parent-last-response"]
filebeat.inputs:
- type: httpjson
enabled: true
id: my-httpjson-id
request.url: http://xyz.com/services/data/v1.0/export_ids/page
request.method: POST
interval: 1h
request.retry.max_attempts: 2
request.retry.wait_min: 5s
request.transforms:
- set:
target: body.page
value: 0
response.request_body_on_pagination: true
response.pagination:
- set:
target: body.page
value: '[[ .last_response.body.page ]]'
fail_on_template_error: true
chain:
- step:
request.url: http://xyz.com/services/data/v1.0/$.exportId/export_ids/$.files[:].id/info
request.method: POST
request.transforms:
- set:
target: body.exportId
value: '[[ .parent_last_response.body.exportId ]]'
replace: $.files[:].id
replace_with: '$.exportId,.parent_last_response.body.exportId'

Here we can see that the chain step uses `.parent_last_response.body.exportId` only because `response.pagination` is present for the parent (root) request.
However if `response.pagination` was not present in the parent (root) request, `replace_with` clause should have used `.first_response.body.exportId`. This is
because when pagination does not exist at the parent level `parent_last_response` object is not populated with required values for performance reasons, but the
`first_response` object always stores the very first response in the process chain.

[float]
==== `request.tracer.filename`

Expand Down Expand Up @@ -1141,7 +1182,7 @@ Collect and make events from response in any format supported by httpjson for al

The `replace_with: "pattern,value"` clause is used to replace a fixed pattern string defined in `request.url` with the given value.
The fixed pattern must have a `$.` prefix, for example: `$.xyz`. The `value` may be hard coded or extracted from context variables
like [`.last_response.*`, `.first_response.*`] etc. The `replace_with` clause can be used in combination with the `replace` clause
like [`.last_response.*`, `.first_response.*`, `.parent_last_response.*`] etc. The `replace_with` clause can be used in combination with the `replace` clause
thus providing a lot of flexibility in the logic of chain requests.

Example:
Expand Down Expand Up @@ -1217,8 +1258,7 @@ response_json using exportId as '2212':
----
This behaviour of targeted fixed pattern replacement in the url helps solve various use cases.

NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the
`replace_with` processor with exact string matching.
NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the `replace_with` processor with exact string matching.

[float]
==== `chain[].while`
Expand Down
155 changes: 151 additions & 4 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package httpjson
import (
"context"
"fmt"
"io/ioutil"
"io"
"math/rand"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -683,17 +683,155 @@ func TestInput(t *testing.T) {
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,first_response.body.exportId",
"replace_with": "$.exportId,.first_response.body.exportId",
},
},
},
},
handler: defaultHandler(http.MethodGet, "", ""),
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test replace_with clause with hardcoded value",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`)
case "/2212/1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/2212/2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,2212",
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test replace_with clause with hardcoded value containing '.' (dots)",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`)
case "/.xyz.2212.abc./1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/.xyz.2212.abc./2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,.xyz.2212.abc.",
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test global transform context separation with parent_last_response object",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
var serverURL string
registerPaginationTransforms()
registerRequestTransforms()
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212", "nextLink":"%s/link1"}`, serverURL)
case "/link1":
fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213"}`)
case "/2212/1":
matchBody(w, r, `{"exportId":"2212"}`, `{"hello":{"world":"moon"}}`)
case "/2212/2":
matchBody(w, r, `{"exportId":"2212"}`, `{"space":{"cake":"pumpkin"}}`)
case "/2213/3":
matchBody(w, r, `{"exportId":"2213"}`, `{"hello":{"cake":"pumpkin"}}`)
case "/2213/4":
matchBody(w, r, `{"exportId":"2213"}`, `{"space":{"world":"moon"}}`)
}
})
server := httptest.NewServer(r)
t.Cleanup(func() { registeredTransforms = newRegistry() })
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodPost,
"response.request_body_on_pagination": true,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodPost,
"replace": "$.files[:].id",
"replace_with": "$.exportId,.parent_last_response.body.exportId",
"request.transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.exportId",
"value": "[[ .parent_last_response.body.exportId ]]",
},
},
},
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
`{"hello":{"cake":"pumpkin"}}`,
`{"space":{"world":"moon"}}`,
},
},
}

for _, testCase := range testCases {
Expand Down Expand Up @@ -826,6 +964,15 @@ func newV2Context() (v2.Context, func()) {
}, cancel
}

//nolint:errcheck // We can safely ignore errors here
func matchBody(w io.Writer, req *http.Request, match, response string) {
body, _ := io.ReadAll(req.Body)
req.Body.Close()
if string(body) == match {
w.Write([]byte(response))
}
}

func defaultHandler(expectedMethod, expectedBody, msg string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "application/json")
Expand All @@ -837,7 +984,7 @@ func defaultHandler(expectedMethod, expectedBody, msg string) http.HandlerFunc {
w.WriteHeader(http.StatusBadRequest)
msg = fmt.Sprintf(`{"error":"expected method was %q"}`, expectedMethod)
case expectedBody != "":
body, _ := ioutil.ReadAll(r.Body)
body, _ := io.ReadAll(r.Body)
r.Body.Close()
if expectedBody != string(body) {
w.WriteHeader(http.StatusBadRequest)
Expand Down
29 changes: 14 additions & 15 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
return fmt.Errorf("failed to execute rf.collectResponse: %w", err)
}
// store first response in transform context
var bodyMap mapstr.M
var bodyMap map[string]interface{}
efd6 marked this conversation as resolved.
Show resolved Hide resolved
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return fmt.Errorf("failed to read http response body: %w", err)
Expand All @@ -319,6 +319,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
body: bodyMap,
}
trCtx.updateFirstResponse(firstResponse)
// since, initially the first response and last response are the same
trCtx.updateLastResponse(firstResponse)

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
Expand Down Expand Up @@ -357,7 +359,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
urlCopy = rf.url
urlString = rf.url.String()

// new transform context for every chain step , derived from parent transform context
// new transform context for every chain step, derived from parent transform context
var chainTrCtx *transformContext
if rf.isChain {
chainTrCtx = trCtx.clone()
Expand All @@ -368,7 +370,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
var replaceArr []string
if rf.replaceWith != "" {
replaceArr = strings.Split(rf.replaceWith, ",")
val, doReplaceWith, err = fetchValueFromContext(trCtx, strings.TrimSpace(replaceArr[1]))
val, doReplaceWith, err = fetchValueFromContext(chainTrCtx, strings.TrimSpace(replaceArr[1]))
if err != nil {
return err
}
Expand Down Expand Up @@ -418,11 +420,11 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p

var events <-chan maybeMsg
if rf.isChain {
events = rf.chainResponseProcessor.startProcessing(stdCtx, trCtx, resps)
events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps)
} else {
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps)
}
n += processAndPublishEvents(trCtx, events, publisher, i < len(r.requestFactories), r.log)
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
}
}

Expand Down Expand Up @@ -517,7 +519,7 @@ func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, pu
return n
}

// processRemainingChainEvents , processes the remaining pagination events for chain blocks
// processRemainingChainEvents, processes the remaining pagination events for chain blocks
func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
// we start from 0, and skip the 1st event since we have already processed it
events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp)
Expand All @@ -542,7 +544,7 @@ func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *t
}
response.Body = io.NopCloser(body)

// for each pagination response , we repeat all the chain steps / blocks
// for each pagination response, we repeat all the chain steps / blocks
count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log)
if err != nil {
r.log.Errorf("error processing chain event: %w", err)
Expand Down Expand Up @@ -592,18 +594,15 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
urlCopy = rf.url
urlString = rf.url.String()

// new transform context for every chain step , derived from parent transform context
var chainTrCtx *transformContext
if rf.isChain {
chainTrCtx = trCtx.clone()
}
// new transform context for every chain step, derived from parent transform context
chainTrCtx := trCtx.clone()

var val string
var doReplaceWith bool
var replaceArr []string
if rf.replaceWith != "" {
replaceArr = strings.Split(rf.replaceWith, ",")
val, doReplaceWith, err = fetchValueFromContext(trCtx, strings.TrimSpace(replaceArr[1]))
val, doReplaceWith, err = fetchValueFromContext(chainTrCtx, strings.TrimSpace(replaceArr[1]))
if err != nil {
return n, err
}
Expand Down Expand Up @@ -651,8 +650,8 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
}
resps = intermediateResps
}
events := rf.chainResponseProcessor.startProcessing(stdCtx, trCtx, resps)
n += processAndPublishEvents(trCtx, events, publisher, i < len(r.requestFactories), r.log)
events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps)
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
}

defer httpResp.Body.Close()
Expand Down
Loading