From 7c34e38fe9158d9513db402e47256bd7072ff141 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Tue, 29 Sep 2020 16:20:29 +0200 Subject: [PATCH] [Filebeat][httpjson] Make httpjson use cursor input when using date cursor (#20751) * Fix duplicate import * Move config to its own package * Minor improvements * Fix tests * Create input manager * Change requester to accept and store a cursor * Modify input to be embedded * Create stateless and cursor inputs * Initialize new input manager on publish * Add changelog entry and format files * Move test data folder * Change tests * Apply requested changes (cherry picked from commit 8f9d54bef44f9745c958bb727aed4c0e180c93cd) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/v2/input-cursor/manager.go | 7 +- .../filebeat/input/default-inputs/inputs.go | 2 +- x-pack/filebeat/input/httpjson/config.go | 58 +++++------ .../filebeat/input/httpjson/config_oauth.go | 73 +++++++------- .../input/httpjson/config_oauth_test.go | 38 ++++---- x-pack/filebeat/input/httpjson/config_test.go | 18 ++-- x-pack/filebeat/input/httpjson/date_cursor.go | 41 ++++---- x-pack/filebeat/input/httpjson/input.go | 96 +++++++++---------- .../filebeat/input/httpjson/input_cursor.go | 67 +++++++++++++ .../filebeat/input/httpjson/input_manager.go | 49 ++++++++++ .../input/httpjson/input_stateless.go | 58 +++++++++++ .../{httpjson_test.go => input_test.go} | 17 ++-- x-pack/filebeat/input/httpjson/pagination.go | 4 +- .../input/httpjson/pagination_test.go | 2 +- .../filebeat/input/httpjson/rate_limiter.go | 2 +- x-pack/filebeat/input/httpjson/requester.go | 36 +++++-- 17 files changed, 385 insertions(+), 184 deletions(-) create mode 100644 x-pack/filebeat/input/httpjson/input_cursor.go create mode 100644 x-pack/filebeat/input/httpjson/input_manager.go create mode 100644 x-pack/filebeat/input/httpjson/input_stateless.go rename x-pack/filebeat/input/httpjson/{httpjson_test.go => input_test.go} (96%) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 37c34eaf4f5..2c89fd4b66f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -685,6 +685,7 @@ field. You can revert this change by configuring tags for the module and omittin - Add type and sub_type to panw panos fileset {pull}20912[20912] - Always attempt community_id processor on zeek module {pull}21155[21155] - Add related.hosts ecs field to all modules {pull}21160[21160] +- Keep cursor state between httpjson input restarts {pull}20751[20751] *Heartbeat* diff --git a/filebeat/input/v2/input-cursor/manager.go b/filebeat/input/v2/input-cursor/manager.go index 2a4310dc778..766d6f17fa0 100644 --- a/filebeat/input/v2/input-cursor/manager.go +++ b/filebeat/input/v2/input-cursor/manager.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/go-concert/unison" - input "github.com/elastic/beats/v7/filebeat/input/v2" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -145,7 +144,7 @@ func (cim *InputManager) shutdown() { // Create builds a new v2.Input using the provided Configure function. // The Input will run a go-routine per source that has been configured. -func (cim *InputManager) Create(config *common.Config) (input.Input, error) { +func (cim *InputManager) Create(config *common.Config) (v2.Input, error) { if err := cim.init(); err != nil { return nil, err } @@ -180,7 +179,7 @@ func (cim *InputManager) Create(config *common.Config) (input.Input, error) { // Lock locks a key for exclusive access and returns an resource that can be used to modify // the cursor state and unlock the key. -func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) { +func (cim *InputManager) lock(ctx v2.Context, key string) (*resource, error) { resource := cim.store.Get(key) err := lockResource(ctx.Logger, resource, ctx.Cancelation) if err != nil { @@ -190,7 +189,7 @@ func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) return resource, nil } -func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error { +func lockResource(log *logp.Logger, resource *resource, canceler v2.Canceler) error { if !resource.lock.TryLock() { log.Infof("Resource '%v' currently in use, waiting...", resource.key) err := resource.lock.LockContext(canceler) diff --git a/x-pack/filebeat/input/default-inputs/inputs.go b/x-pack/filebeat/input/default-inputs/inputs.go index 1fe245b80f7..cd8562560da 100644 --- a/x-pack/filebeat/input/default-inputs/inputs.go +++ b/x-pack/filebeat/input/default-inputs/inputs.go @@ -27,7 +27,7 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 return []v2.Plugin{ cloudfoundry.Plugin(), http_endpoint.Plugin(), - httpjson.Plugin(), + httpjson.Plugin(log, store), o365audit.Plugin(log, store), } } diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index 95ca205be0d..ee1445b8a3d 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -17,9 +17,9 @@ import ( "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) -// Config contains information about httpjson configuration +// config contains information about httpjson configuration type config struct { - OAuth2 *OAuth2 `config:"oauth2"` + OAuth2 *oauth2Config `config:"oauth2"` APIKey string `config:"api_key"` AuthenticationScheme string `config:"authentication_scheme"` HTTPClientTimeout time.Duration `config:"http_client_timeout"` @@ -30,21 +30,21 @@ type config struct { JSONObjects string `config:"json_objects_array"` SplitEventsBy string `config:"split_events_by"` NoHTTPBody bool `config:"no_http_body"` - Pagination *Pagination `config:"pagination"` - RateLimit *RateLimit `config:"rate_limit"` + Pagination *paginationConfig `config:"pagination"` + RateLimit *rateLimitConfig `config:"rate_limit"` RetryMax int `config:"retry.max_attempts"` RetryWaitMin time.Duration `config:"retry.wait_min"` RetryWaitMax time.Duration `config:"retry.wait_max"` TLS *tlscommon.Config `config:"ssl"` - URL *URL `config:"url" validate:"required"` - DateCursor *DateCursor `config:"date_cursor"` + URL *urlConfig `config:"url" validate:"required"` + DateCursor *dateCursorConfig `config:"date_cursor"` } // Pagination contains information about httpjson pagination settings -type Pagination struct { +type paginationConfig struct { Enabled *bool `config:"enabled"` ExtraBodyContent common.MapStr `config:"extra_body_content"` - Header *Header `config:"header"` + Header *headerConfig `config:"header"` IDField string `config:"id_field"` RequestField string `config:"req_field"` URLField string `config:"url_field"` @@ -52,76 +52,76 @@ type Pagination struct { } // IsEnabled returns true if the `enable` field is set to true in the yaml. -func (p *Pagination) IsEnabled() bool { +func (p *paginationConfig) isEnabled() bool { return p != nil && (p.Enabled == nil || *p.Enabled) } // HTTP Header information for pagination -type Header struct { +type headerConfig struct { FieldName string `config:"field_name" validate:"required"` RegexPattern *regexp.Regexp `config:"regex_pattern" validate:"required"` } // HTTP Header Rate Limit information -type RateLimit struct { +type rateLimitConfig struct { Limit string `config:"limit"` Reset string `config:"reset"` Remaining string `config:"remaining"` } -type DateCursor struct { - Enabled *bool `config:"enabled"` - Field string `config:"field"` - URLField string `config:"url_field" validate:"required"` - ValueTemplate *Template `config:"value_template"` - DateFormat string `config:"date_format"` - InitialInterval time.Duration `config:"initial_interval"` +type dateCursorConfig struct { + Enabled *bool `config:"enabled"` + Field string `config:"field"` + URLField string `config:"url_field" validate:"required"` + ValueTemplate *templateConfig `config:"value_template"` + DateFormat string `config:"date_format"` + InitialInterval time.Duration `config:"initial_interval"` } -type Template struct { +type templateConfig struct { *template.Template } -func (t *Template) Unpack(in string) error { +func (t *templateConfig) Unpack(in string) error { tpl, err := template.New("tpl").Parse(in) if err != nil { return err } - *t = Template{Template: tpl} + *t = templateConfig{Template: tpl} return nil } -type URL struct { +type urlConfig struct { *url.URL } -func (u *URL) Unpack(in string) error { +func (u *urlConfig) Unpack(in string) error { parsed, err := url.Parse(in) if err != nil { return err } - *u = URL{URL: parsed} + *u = urlConfig{URL: parsed} return nil } // IsEnabled returns true if the `enable` field is set to true in the yaml. -func (dc *DateCursor) IsEnabled() bool { +func (dc *dateCursorConfig) isEnabled() bool { return dc != nil && (dc.Enabled == nil || *dc.Enabled) } // IsEnabled returns true if the `enable` field is set to true in the yaml. -func (dc *DateCursor) GetDateFormat() string { +func (dc *dateCursorConfig) getDateFormat() string { if dc.DateFormat == "" { return time.RFC3339 } return dc.DateFormat } -func (dc *DateCursor) Validate() error { +func (dc *dateCursorConfig) Validate() error { if dc.DateFormat == "" { return nil } @@ -154,7 +154,7 @@ func (c *config) Validate() error { } } } - if c.OAuth2.IsEnabled() { + if c.OAuth2.isEnabled() { if c.APIKey != "" || c.AuthenticationScheme != "" { return errors.New("invalid configuration: oauth2 and api_key or authentication_scheme cannot be set simultaneously") } @@ -162,7 +162,7 @@ func (c *config) Validate() error { return nil } -func defaultConfig() config { +func newDefaultConfig() config { var c config c.HTTPMethod = "GET" c.HTTPClientTimeout = 60 * time.Second diff --git a/x-pack/filebeat/input/httpjson/config_oauth.go b/x-pack/filebeat/input/httpjson/config_oauth.go index 0ff55dcbc33..d7412fd0ba8 100644 --- a/x-pack/filebeat/input/httpjson/config_oauth.go +++ b/x-pack/filebeat/input/httpjson/config_oauth.go @@ -20,32 +20,32 @@ import ( "golang.org/x/oauth2/google" ) -// An OAuth2Provider represents a supported oauth provider. -type OAuth2Provider string +// An oauth2Provider represents a supported oauth provider. +type oauth2Provider string const ( - OAuth2ProviderDefault OAuth2Provider = "" // OAuth2ProviderDefault means no specific provider is set. - OAuth2ProviderAzure OAuth2Provider = "azure" // OAuth2ProviderAzure AzureAD. - OAuth2ProviderGoogle OAuth2Provider = "google" // OAuth2ProviderGoogle Google. + oauth2ProviderDefault oauth2Provider = "" // OAuth2ProviderDefault means no specific provider is set. + oauth2ProviderAzure oauth2Provider = "azure" // OAuth2ProviderAzure AzureAD. + oauth2ProviderGoogle oauth2Provider = "google" // OAuth2ProviderGoogle Google. ) -func (p *OAuth2Provider) Unpack(in string) error { - *p = OAuth2Provider(in) +func (p *oauth2Provider) Unpack(in string) error { + *p = oauth2Provider(in) return nil } -func (p OAuth2Provider) canonical() OAuth2Provider { - return OAuth2Provider(strings.ToLower(string(p))) +func (p oauth2Provider) canonical() oauth2Provider { + return oauth2Provider(strings.ToLower(string(p))) } -// OAuth2 contains information about oauth2 authentication settings. -type OAuth2 struct { +// oauth2Config contains information about oauth2 authentication settings. +type oauth2Config struct { // common oauth fields ClientID string `config:"client.id"` ClientSecret string `config:"client.secret"` Enabled *bool `config:"enabled"` EndpointParams map[string][]string `config:"endpoint_params"` - Provider OAuth2Provider `config:"provider"` + Provider oauth2Provider `config:"provider"` Scopes []string `config:"scopes"` TokenURL string `config:"token_url"` @@ -61,25 +61,26 @@ type OAuth2 struct { } // IsEnabled returns true if the `enable` field is set to true in the yaml. -func (o *OAuth2) IsEnabled() bool { +func (o *oauth2Config) isEnabled() bool { return o != nil && (o.Enabled == nil || *o.Enabled) } // Client wraps the given http.Client and returns a new one that will use the oauth authentication. -func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client, error) { - ctx = context.WithValue(ctx, oauth2.HTTPClient, client) +func (o *oauth2Config) client(ctx context.Context, client *http.Client) (*http.Client, error) { + // only required to let oauth2 library to find our custom client in the context + ctx = context.WithValue(context.Background(), oauth2.HTTPClient, client) - switch o.GetProvider() { - case OAuth2ProviderAzure, OAuth2ProviderDefault: + switch o.getProvider() { + case oauth2ProviderAzure, oauth2ProviderDefault: creds := clientcredentials.Config{ ClientID: o.ClientID, ClientSecret: o.ClientSecret, - TokenURL: o.GetTokenURL(), + TokenURL: o.getTokenURL(), Scopes: o.Scopes, - EndpointParams: o.GetEndpointParams(), + EndpointParams: o.getEndpointParams(), } return creds.Client(ctx), nil - case OAuth2ProviderGoogle: + case oauth2ProviderGoogle: if o.GoogleJWTFile != "" { cfg, err := google.JWTConfigFromJSON(o.GoogleCredentialsJSON, o.Scopes...) if err != nil { @@ -100,9 +101,9 @@ func (o *OAuth2) Client(ctx context.Context, client *http.Client) (*http.Client, } // GetTokenURL returns the TokenURL. -func (o *OAuth2) GetTokenURL() string { - switch o.GetProvider() { - case OAuth2ProviderAzure: +func (o *oauth2Config) getTokenURL() string { + switch o.getProvider() { + case oauth2ProviderAzure: if o.TokenURL == "" { return endpoints.AzureAD(o.AzureTenantID).TokenURL } @@ -112,14 +113,14 @@ func (o *OAuth2) GetTokenURL() string { } // GetProvider returns provider in its canonical form. -func (o OAuth2) GetProvider() OAuth2Provider { +func (o oauth2Config) getProvider() oauth2Provider { return o.Provider.canonical() } // GetEndpointParams returns endpoint params with any provider ones combined. -func (o OAuth2) GetEndpointParams() map[string][]string { - switch o.GetProvider() { - case OAuth2ProviderAzure: +func (o oauth2Config) getEndpointParams() map[string][]string { + switch o.getProvider() { + case oauth2ProviderAzure: if o.AzureResource != "" { if o.EndpointParams == nil { o.EndpointParams = map[string][]string{} @@ -132,18 +133,18 @@ func (o OAuth2) GetEndpointParams() map[string][]string { } // Validate checks if oauth2 config is valid. -func (o *OAuth2) Validate() error { - switch o.GetProvider() { - case OAuth2ProviderAzure: +func (o *oauth2Config) Validate() error { + switch o.getProvider() { + case oauth2ProviderAzure: return o.validateAzureProvider() - case OAuth2ProviderGoogle: + case oauth2ProviderGoogle: return o.validateGoogleProvider() - case OAuth2ProviderDefault: + case oauth2ProviderDefault: if o.TokenURL == "" || o.ClientID == "" || o.ClientSecret == "" { return errors.New("invalid configuration: both token_url and client credentials must be provided") } default: - return fmt.Errorf("invalid configuration: unknown provider %q", o.GetProvider()) + return fmt.Errorf("invalid configuration: unknown provider %q", o.getProvider()) } return nil } @@ -151,7 +152,7 @@ func (o *OAuth2) Validate() error { // findDefaultGoogleCredentials will default to google.FindDefaultCredentials and will only be changed for testing purposes var findDefaultGoogleCredentials = google.FindDefaultCredentials -func (o *OAuth2) validateGoogleProvider() error { +func (o *oauth2Config) validateGoogleProvider() error { if o.TokenURL != "" || o.ClientID != "" || o.ClientSecret != "" || o.AzureTenantID != "" || o.AzureResource != "" || len(o.EndpointParams) > 0 { return errors.New("invalid configuration: none of token_url and client credentials can be used, use google.credentials_file, google.jwt_file, google.credentials_json or ADC instead") @@ -191,7 +192,7 @@ func (o *OAuth2) validateGoogleProvider() error { return fmt.Errorf("invalid configuration: no authentication credentials were configured or detected (ADC)") } -func (o *OAuth2) populateCredentialsJSONFromFile(file string) error { +func (o *oauth2Config) populateCredentialsJSONFromFile(file string) error { if _, err := os.Stat(file); os.IsNotExist(err) { return fmt.Errorf("invalid configuration: the file %q cannot be found", file) } @@ -210,7 +211,7 @@ func (o *OAuth2) populateCredentialsJSONFromFile(file string) error { return nil } -func (o *OAuth2) validateAzureProvider() error { +func (o *oauth2Config) validateAzureProvider() error { if o.TokenURL == "" && o.AzureTenantID == "" { return errors.New("invalid configuration: at least one of token_url or tenant_id must be provided") } diff --git a/x-pack/filebeat/input/httpjson/config_oauth_test.go b/x-pack/filebeat/input/httpjson/config_oauth_test.go index 3fa0eed4284..67ec63b6650 100644 --- a/x-pack/filebeat/input/httpjson/config_oauth_test.go +++ b/x-pack/filebeat/input/httpjson/config_oauth_test.go @@ -11,8 +11,8 @@ import ( func TestProviderCanonical(t *testing.T) { const ( - a OAuth2Provider = "gOoGle" - b OAuth2Provider = "google" + a oauth2Provider = "gOoGle" + b oauth2Provider = "google" ) if a.canonical() != b.canonical() { @@ -21,74 +21,74 @@ func TestProviderCanonical(t *testing.T) { } func TestGetProviderIsCanonical(t *testing.T) { - const expected OAuth2Provider = "google" + const expected oauth2Provider = "google" - oauth2 := OAuth2{Provider: "GOogle"} - if oauth2.GetProvider() != expected { + oauth2 := oauth2Config{Provider: "GOogle"} + if oauth2.getProvider() != expected { t.Fatal("GetProvider should return canonical provider") } } func TestIsEnabled(t *testing.T) { - oauth2 := OAuth2{} - if !oauth2.IsEnabled() { + oauth2 := oauth2Config{} + if !oauth2.isEnabled() { t.Fatal("OAuth2 should be enabled by default") } var enabled = false oauth2.Enabled = &enabled - if oauth2.IsEnabled() { + if oauth2.isEnabled() { t.Fatal("OAuth2 should be disabled") } enabled = true - if !oauth2.IsEnabled() { + if !oauth2.isEnabled() { t.Fatal("OAuth2 should be enabled") } } func TestGetTokenURL(t *testing.T) { const expected = "http://localhost" - oauth2 := OAuth2{TokenURL: "http://localhost"} - if got := oauth2.GetTokenURL(); got != expected { + oauth2 := oauth2Config{TokenURL: "http://localhost"} + if got := oauth2.getTokenURL(); got != expected { t.Fatalf("GetTokenURL should return the provided TokenURL but got %q", got) } } func TestGetTokenURLWithAzure(t *testing.T) { const expectedWithoutTenantID = "http://localhost" - oauth2 := OAuth2{TokenURL: "http://localhost", Provider: "azure"} - if got := oauth2.GetTokenURL(); got != expectedWithoutTenantID { + oauth2 := oauth2Config{TokenURL: "http://localhost", Provider: "azure"} + if got := oauth2.getTokenURL(); got != expectedWithoutTenantID { t.Fatalf("GetTokenURL should return the provided TokenURL but got %q", got) } oauth2.TokenURL = "" oauth2.AzureTenantID = "a_tenant_id" const expectedWithTenantID = "https://login.microsoftonline.com/a_tenant_id/oauth2/v2.0/token" - if got := oauth2.GetTokenURL(); got != expectedWithTenantID { + if got := oauth2.getTokenURL(); got != expectedWithTenantID { t.Fatalf("GetTokenURL should return the generated TokenURL but got %q", got) } } func TestGetEndpointParams(t *testing.T) { var expected = map[string][]string{"foo": {"bar"}} - oauth2 := OAuth2{EndpointParams: map[string][]string{"foo": {"bar"}}} - if got := oauth2.GetEndpointParams(); !reflect.DeepEqual(got, expected) { + oauth2 := oauth2Config{EndpointParams: map[string][]string{"foo": {"bar"}}} + if got := oauth2.getEndpointParams(); !reflect.DeepEqual(got, expected) { t.Fatalf("GetEndpointParams should return the provided EndpointParams but got %q", got) } } func TestGetEndpointParamsWithAzure(t *testing.T) { var expectedWithoutResource = map[string][]string{"foo": {"bar"}} - oauth2 := OAuth2{Provider: "azure", EndpointParams: map[string][]string{"foo": {"bar"}}} - if got := oauth2.GetEndpointParams(); !reflect.DeepEqual(got, expectedWithoutResource) { + oauth2 := oauth2Config{Provider: "azure", EndpointParams: map[string][]string{"foo": {"bar"}}} + if got := oauth2.getEndpointParams(); !reflect.DeepEqual(got, expectedWithoutResource) { t.Fatalf("GetEndpointParams should return the provided EndpointParams but got %q", got) } oauth2.AzureResource = "baz" var expectedWithResource = map[string][]string{"foo": {"bar"}, "resource": {"baz"}} - if got := oauth2.GetEndpointParams(); !reflect.DeepEqual(got, expectedWithResource) { + if got := oauth2.getEndpointParams(); !reflect.DeepEqual(got, expectedWithResource) { t.Fatalf("GetEndpointParams should return the provided EndpointParams but got %q", got) } } diff --git a/x-pack/filebeat/input/httpjson/config_test.go b/x-pack/filebeat/input/httpjson/config_test.go index 0de07311239..85c7c64848d 100644 --- a/x-pack/filebeat/input/httpjson/config_test.go +++ b/x-pack/filebeat/input/httpjson/config_test.go @@ -25,7 +25,7 @@ func TestConfigValidationCase1(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. no_http_body and http_request_body cannot coexist.") } @@ -39,7 +39,7 @@ func TestConfigValidationCase2(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. no_http_body and pagination.extra_body_content cannot coexist.") } @@ -53,7 +53,7 @@ func TestConfigValidationCase3(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. no_http_body and pagination.req_field cannot coexist.") } @@ -66,7 +66,7 @@ func TestConfigValidationCase4(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. pagination.header and pagination.req_field cannot coexist.") } @@ -79,7 +79,7 @@ func TestConfigValidationCase5(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. pagination.header and pagination.id_field cannot coexist.") } @@ -92,7 +92,7 @@ func TestConfigValidationCase6(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. pagination.header and extra_body_content cannot coexist.") } @@ -105,7 +105,7 @@ func TestConfigValidationCase7(t *testing.T) { "url": "localhost", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := newDefaultConfig() if err := cfg.Unpack(&conf); err == nil { t.Fatal("Configuration validation failed. http_method DELETE is not allowed.") } @@ -116,7 +116,7 @@ func TestConfigMustFailWithInvalidURL(t *testing.T) { "url": "::invalid::", } cfg := common.MustNewConfigFrom(m) - conf := defaultConfig() + conf := newDefaultConfig() err := cfg.Unpack(&conf) assert.EqualError(t, err, `parse "::invalid::": missing protocol scheme accessing 'url'`) } @@ -414,7 +414,7 @@ func TestConfigOauth2Validation(t *testing.T) { } cfg := common.MustNewConfigFrom(c.input) - conf := defaultConfig() + conf := newDefaultConfig() err := cfg.Unpack(&conf) switch { diff --git a/x-pack/filebeat/input/httpjson/date_cursor.go b/x-pack/filebeat/input/httpjson/date_cursor.go index 2a9db44bd2a..66ca659de78 100644 --- a/x-pack/filebeat/input/httpjson/date_cursor.go +++ b/x-pack/filebeat/input/httpjson/date_cursor.go @@ -7,6 +7,7 @@ package httpjson import ( "bytes" "net/url" + "text/template" "time" "github.com/elastic/beats/v7/libbeat/common" @@ -22,13 +23,12 @@ type dateCursor struct { initialInterval time.Duration dateFormat string - value string - valueTpl *Template + valueTpl *template.Template } func newDateCursorFromConfig(config config, log *logp.Logger) *dateCursor { c := &dateCursor{ - enabled: config.DateCursor.IsEnabled(), + enabled: config.DateCursor.isEnabled(), url: *config.URL.URL, } @@ -40,23 +40,23 @@ func newDateCursorFromConfig(config config, log *logp.Logger) *dateCursor { c.field = config.DateCursor.Field c.urlField = config.DateCursor.URLField c.initialInterval = config.DateCursor.InitialInterval - c.dateFormat = config.DateCursor.GetDateFormat() - c.valueTpl = config.DateCursor.ValueTemplate + c.dateFormat = config.DateCursor.getDateFormat() + c.valueTpl = config.DateCursor.ValueTemplate.Template return c } -func (c *dateCursor) getURL() string { +func (c *dateCursor) getURL(prevValue string) string { if !c.enabled { return c.url.String() } var dateStr string - if c.value == "" { + if prevValue == "" { t := timeNow().UTC().Add(-c.initialInterval) dateStr = t.Format(c.dateFormat) } else { - dateStr = c.value + dateStr = prevValue } q := c.url.Query() @@ -66,7 +66,7 @@ func (c *dateCursor) getURL() string { value = dateStr } else { buf := new(bytes.Buffer) - if err := c.valueTpl.Template.Execute(buf, dateStr); err != nil { + if err := c.valueTpl.Execute(buf, dateStr); err != nil { return c.url.String() } value = buf.String() @@ -74,32 +74,33 @@ func (c *dateCursor) getURL() string { q.Set(c.urlField, value) - c.url.RawQuery = q.Encode() + url := c.url + url.RawQuery = q.Encode() - return c.url.String() + return url.String() } -func (c *dateCursor) advance(m common.MapStr) { +func (c *dateCursor) getNextValue(m common.MapStr) string { if c.field == "" { - c.value = time.Now().UTC().Format(c.dateFormat) - return + return time.Now().UTC().Format(c.dateFormat) } v, err := m.GetValue(c.field) if err != nil { c.log.Warnf("date_cursor field: %q", err) - return + return "" } + switch t := v.(type) { case string: _, err := time.Parse(c.dateFormat, t) if err != nil { c.log.Warn("date_cursor field does not have the expected layout") - return + return "" } - c.value = t - default: - c.log.Warn("date_cursor field must be a string, cursor will not advance") - return + return t } + + c.log.Warn("date_cursor field must be a string, cursor will not advance") + return "" } diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 5ba3ce912bb..4a905666f50 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -9,12 +9,14 @@ import ( "fmt" "net" "net/http" + "net/url" "time" "github.com/hashicorp/go-retryablehttp" "go.uber.org/zap" v2 "github.com/elastic/beats/v7/filebeat/input/v2" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -63,30 +65,25 @@ func (log *retryLogger) Warn(format string, args ...interface{}) { log.log.Warnf(format, args...) } -type httpJSONInput struct { - config config - tlsConfig *tlscommon.TLSConfig -} - -func Plugin() v2.Plugin { +func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { + sim := stateless.NewInputManager(statelessConfigure) return v2.Plugin{ Name: inputName, Stability: feature.Beta, Deprecated: false, - Manager: stateless.NewInputManager(configure), - } -} - -func configure(cfg *common.Config) (stateless.Input, error) { - conf := defaultConfig() - if err := cfg.Unpack(&conf); err != nil { - return nil, err + Manager: inputManager{ + stateless: &sim, + cursor: &cursor.InputManager{ + Logger: log, + StateStore: store, + Type: inputName, + Configure: cursorConfigure, + }, + }, } - - return newHTTPJSONInput(conf) } -func newHTTPJSONInput(config config) (*httpJSONInput, error) { +func newTLSConfig(config config) (*tlscommon.TLSConfig, error) { if err := config.Validate(); err != nil { return nil, err } @@ -96,54 +93,53 @@ func newHTTPJSONInput(config config) (*httpJSONInput, error) { return nil, err } - return &httpJSONInput{ - config: config, - tlsConfig: tlsConfig, - }, nil + return tlsConfig, nil } -func (*httpJSONInput) Name() string { return inputName } - -func (in *httpJSONInput) Test(v2.TestContext) error { +func test(url *url.URL) error { port := func() string { - if in.config.URL.Port() != "" { - return in.config.URL.Port() + if url.Port() != "" { + return url.Port() } - switch in.config.URL.Scheme { + switch url.Scheme { case "https": return "443" } return "80" }() - _, err := net.DialTimeout("tcp", net.JoinHostPort(in.config.URL.Hostname(), port), time.Second) + _, err := net.DialTimeout("tcp", net.JoinHostPort(url.Hostname(), port), time.Second) if err != nil { - return fmt.Errorf("url %q is unreachable", in.config.URL) + return fmt.Errorf("url %q is unreachable", url) } return nil } -// Run starts the input and blocks until it ends the execution. -// It will return on context cancellation, any other error will be retried. -func (in *httpJSONInput) Run(ctx v2.Context, publisher stateless.Publisher) error { - log := ctx.Logger.With("url", in.config.URL) +func run( + ctx v2.Context, + config config, + tlsConfig *tlscommon.TLSConfig, + publisher cursor.Publisher, + cursor *cursor.Cursor, +) error { + log := ctx.Logger.With("url", config.URL) stdCtx := ctxtool.FromCanceller(ctx.Cancelation) - httpClient, err := in.newHTTPClient(stdCtx) + httpClient, err := newHTTPClient(stdCtx, config, tlsConfig) if err != nil { return err } - dateCursor := newDateCursorFromConfig(in.config, log) + dateCursor := newDateCursorFromConfig(config, log) - rateLimiter := newRateLimiterFromConfig(in.config, log) + rateLimiter := newRateLimiterFromConfig(config, log) - pagination := newPaginationFromConfig(in.config) + pagination := newPaginationFromConfig(config) requester := newRequester( - in.config, + config, rateLimiter, dateCursor, pagination, @@ -151,12 +147,14 @@ func (in *httpJSONInput) Run(ctx v2.Context, publisher stateless.Publisher) erro log, ) + requester.loadCursor(cursor, log) + // TODO: disallow passing interval = 0 as a mean to run once. - if in.config.Interval == 0 { + if config.Interval == 0 { return requester.processHTTPRequest(stdCtx, publisher) } - err = timed.Periodic(stdCtx, in.config.Interval, func() error { + err = timed.Periodic(stdCtx, config.Interval, func() error { log.Info("Process another repeated request.") if err := requester.processHTTPRequest(stdCtx, publisher); err != nil { log.Error(err) @@ -169,29 +167,29 @@ func (in *httpJSONInput) Run(ctx v2.Context, publisher stateless.Publisher) erro return nil } -func (in *httpJSONInput) newHTTPClient(ctx context.Context) (*http.Client, error) { +func newHTTPClient(ctx context.Context, config config, tlsConfig *tlscommon.TLSConfig) (*http.Client, error) { // Make retryable HTTP client client := &retryablehttp.Client{ HTTPClient: &http.Client{ Transport: &http.Transport{ DialContext: (&net.Dialer{ - Timeout: in.config.HTTPClientTimeout, + Timeout: config.HTTPClientTimeout, }).DialContext, - TLSClientConfig: in.tlsConfig.ToConfig(), + TLSClientConfig: tlsConfig.ToConfig(), DisableKeepAlives: true, }, - Timeout: in.config.HTTPClientTimeout, + Timeout: config.HTTPClientTimeout, }, Logger: newRetryLogger(), - RetryWaitMin: in.config.RetryWaitMin, - RetryWaitMax: in.config.RetryWaitMax, - RetryMax: in.config.RetryMax, + RetryWaitMin: config.RetryWaitMin, + RetryWaitMax: config.RetryWaitMax, + RetryMax: config.RetryMax, CheckRetry: retryablehttp.DefaultRetryPolicy, Backoff: retryablehttp.DefaultBackoff, } - if in.config.OAuth2.IsEnabled() { - return in.config.OAuth2.Client(ctx, client.StandardClient()) + if config.OAuth2.isEnabled() { + return config.OAuth2.client(ctx, client.StandardClient()) } return client.StandardClient(), nil diff --git a/x-pack/filebeat/input/httpjson/input_cursor.go b/x-pack/filebeat/input/httpjson/input_cursor.go new file mode 100644 index 00000000000..d18a91f3918 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/input_cursor.go @@ -0,0 +1,67 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" +) + +type cursorInput struct{} + +func (cursorInput) Name() string { + return "httpjson-cursor" +} + +type source struct { + config config + tlsConfig *tlscommon.TLSConfig +} + +func (src source) Name() string { + return src.config.URL.String() +} + +func cursorConfigure(cfg *common.Config) ([]cursor.Source, cursor.Input, error) { + conf := newDefaultConfig() + if err := cfg.Unpack(&conf); err != nil { + return nil, nil, err + } + return newCursorInput(conf) +} + +func newCursorInput(config config) ([]cursor.Source, cursor.Input, error) { + tlsConfig, err := newTLSConfig(config) + if err != nil { + return nil, nil, err + } + // we only allow one url per config, if we wanted to allow more than one + // each source should hold only one url + return []cursor.Source{ + &source{config: config, + tlsConfig: tlsConfig, + }, + }, + &cursorInput{}, + nil +} + +func (in *cursorInput) Test(src cursor.Source, _ v2.TestContext) error { + return test((src.(*source)).config.URL.URL) +} + +// Run starts the input and blocks until it ends the execution. +// It will return on context cancellation, any other error will be retried. +func (in *cursorInput) Run( + ctx v2.Context, + src cursor.Source, + cursor cursor.Cursor, + publisher cursor.Publisher, +) error { + s := src.(*source) + return run(ctx, s.config, s.tlsConfig, publisher, &cursor) +} diff --git a/x-pack/filebeat/input/httpjson/input_manager.go b/x-pack/filebeat/input/httpjson/input_manager.go new file mode 100644 index 00000000000..21f5066dc05 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/input_manager.go @@ -0,0 +1,49 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "go.uber.org/multierr" + + "github.com/elastic/go-concert/unison" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/common" +) + +// inputManager wraps one stateless input manager +// and one cursor input manager. It will create one or the other +// based on the config that is passed. +type inputManager struct { + stateless *stateless.InputManager + cursor *cursor.InputManager +} + +var _ v2.InputManager = inputManager{} + +// Init initializes both wrapped input managers. +func (m inputManager) Init(grp unison.Group, mode v2.Mode) error { + return multierr.Append( + m.stateless.Init(grp, mode), + m.cursor.Init(grp, mode), + ) +} + +// Create creates a cursor input manager if the config has a date cursor set up, +// otherwise it creates a stateless input manager. +func (m inputManager) Create(cfg *common.Config) (v2.Input, error) { + var config config + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + if config.DateCursor != nil { + return m.cursor.Create(cfg) + } + + return m.stateless.Create(cfg) +} diff --git a/x-pack/filebeat/input/httpjson/input_stateless.go b/x-pack/filebeat/input/httpjson/input_stateless.go new file mode 100644 index 00000000000..c7ebf6c3d4c --- /dev/null +++ b/x-pack/filebeat/input/httpjson/input_stateless.go @@ -0,0 +1,58 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" +) + +type statelessInput struct { + config config + tlsConfig *tlscommon.TLSConfig +} + +func (statelessInput) Name() string { + return "httpjson-stateless" +} + +func statelessConfigure(cfg *common.Config) (stateless.Input, error) { + conf := newDefaultConfig() + if err := cfg.Unpack(&conf); err != nil { + return nil, err + } + return newStatelessInput(conf) +} + +func newStatelessInput(config config) (*statelessInput, error) { + tlsConfig, err := newTLSConfig(config) + if err != nil { + return nil, err + } + return &statelessInput{config: config, tlsConfig: tlsConfig}, nil +} + +func (in *statelessInput) Test(v2.TestContext) error { + return test(in.config.URL.URL) +} + +type statelessPublisher struct { + wrapped stateless.Publisher +} + +func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error { + pub.wrapped.Publish(event) + return nil +} + +// Run starts the input and blocks until it ends the execution. +// It will return on context cancellation, any other error will be retried. +func (in *statelessInput) Run(ctx v2.Context, publisher stateless.Publisher) error { + pub := statelessPublisher{wrapped: publisher} + return run(ctx, in.config, in.tlsConfig, pub, nil) +} diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/input_test.go similarity index 96% rename from x-pack/filebeat/input/httpjson/httpjson_test.go rename to x-pack/filebeat/input/httpjson/input_test.go index b541c16002e..242811d2795 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -23,7 +23,7 @@ import ( beattest "github.com/elastic/beats/v7/libbeat/publisher/testing" ) -func TestHTTPJSONInput(t *testing.T) { +func TestStatelessHTTPJSONInput(t *testing.T) { testCases := []struct { name string setupServer func(*testing.T, http.HandlerFunc, map[string]interface{}) @@ -224,20 +224,23 @@ func TestHTTPJSONInput(t *testing.T) { cfg := common.MustNewConfigFrom(tc.baseConfig) - input, err := configure(cfg) + conf := newDefaultConfig() + assert.NoError(t, cfg.Unpack(&conf)) + + input, err := newStatelessInput(conf) assert.NoError(t, err) - assert.Equal(t, "httpjson", input.Name()) + assert.Equal(t, "httpjson-stateless", input.Name()) assert.NoError(t, input.Test(v2.TestContext{})) - pub := beattest.NewChanClient(len(tc.expected)) - t.Cleanup(func() { _ = pub.Close() }) + chanClient := beattest.NewChanClient(len(tc.expected)) + t.Cleanup(func() { _ = chanClient.Close() }) ctx, cancel := newV2Context() t.Cleanup(cancel) var g errgroup.Group - g.Go(func() error { return input.Run(ctx, pub) }) + g.Go(func() error { return input.Run(ctx, chanClient) }) timeout := time.NewTimer(5 * time.Second) t.Cleanup(func() { _ = timeout.Stop() }) @@ -249,7 +252,7 @@ func TestHTTPJSONInput(t *testing.T) { case <-timeout.C: t.Errorf("timed out waiting for %d events", len(tc.expected)) return - case got := <-pub.Channel: + case got := <-chanClient.Channel: val, err := got.Fields.GetValue("message") assert.NoError(t, err) assert.JSONEq(t, tc.expected[receivedCount], val.(string)) diff --git a/x-pack/filebeat/input/httpjson/pagination.go b/x-pack/filebeat/input/httpjson/pagination.go index 9a7bf82b2b4..020bc783055 100644 --- a/x-pack/filebeat/input/httpjson/pagination.go +++ b/x-pack/filebeat/input/httpjson/pagination.go @@ -16,7 +16,7 @@ import ( type pagination struct { extraBodyContent common.MapStr - header *Header + header *headerConfig idField string requestField string urlField string @@ -24,7 +24,7 @@ type pagination struct { } func newPaginationFromConfig(config config) *pagination { - if !config.Pagination.IsEnabled() { + if !config.Pagination.isEnabled() { return nil } return &pagination{ diff --git a/x-pack/filebeat/input/httpjson/pagination_test.go b/x-pack/filebeat/input/httpjson/pagination_test.go index 9b04de75819..32e3261c1e6 100644 --- a/x-pack/filebeat/input/httpjson/pagination_test.go +++ b/x-pack/filebeat/input/httpjson/pagination_test.go @@ -42,7 +42,7 @@ func TestCreateRequestInfoFromBody(t *testing.T) { contentMap: common.MapStr{}, headers: common.MapStr{}, } - err := pagination.setRequestInfoFromBody( + _ = pagination.setRequestInfoFromBody( common.MapStr(m), common.MapStr(m), ri, diff --git a/x-pack/filebeat/input/httpjson/rate_limiter.go b/x-pack/filebeat/input/httpjson/rate_limiter.go index 57d206224ac..93c2b4a3fe7 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter.go @@ -122,7 +122,7 @@ func (r *rateLimiter) getRateLimit(header http.Header) (int64, error) { if err != nil { return 0, fmt.Errorf("failed to parse rate-limit reset value: %w", err) } - if time.Unix(epoch, 0).Sub(time.Now()) <= 0 { + if time.Until(time.Unix(epoch, 0)) <= 0 { return 0, nil } diff --git a/x-pack/filebeat/input/httpjson/requester.go b/x-pack/filebeat/input/httpjson/requester.go index b5f58179aa0..df0a1efb1eb 100644 --- a/x-pack/filebeat/input/httpjson/requester.go +++ b/x-pack/filebeat/input/httpjson/requester.go @@ -14,7 +14,7 @@ import ( "net/http" "strings" - stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -40,6 +40,8 @@ type requester struct { authScheme string jsonObjects string splitEventsBy string + + cursorState cursorState } func newRequester( @@ -72,9 +74,9 @@ type response struct { } // processHTTPRequest processes HTTP request, and handles pagination if enabled -func (r *requester) processHTTPRequest(ctx context.Context, publisher stateless.Publisher) error { +func (r *requester) processHTTPRequest(ctx context.Context, publisher cursor.Publisher) error { ri := &requestInfo{ - url: r.dateCursor.getURL(), + url: r.dateCursor.getURL(r.cursorState.LastDateCursorValue), contentMap: common.MapStr{}, headers: r.headers, } @@ -166,7 +168,7 @@ func (r *requester) processHTTPRequest(ctx context.Context, publisher stateless. } if lastObj != nil && r.dateCursor.enabled { - r.dateCursor.advance(common.MapStr(lastObj)) + r.updateCursorState(ri.url, r.dateCursor.getNextValue(common.MapStr(lastObj))) } return nil @@ -210,7 +212,7 @@ func (r *requester) createHTTPRequest(ctx context.Context, ri *requestInfo) (*ht } // processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any. -func (r *requester) processEventArray(publisher stateless.Publisher, events []interface{}) (map[string]interface{}, error) { +func (r *requester) processEventArray(publisher cursor.Publisher, events []interface{}) (map[string]interface{}, error) { var last map[string]interface{} for _, t := range events { switch v := t.(type) { @@ -221,7 +223,9 @@ func (r *requester) processEventArray(publisher stateless.Publisher, events []in if err != nil { return nil, fmt.Errorf("failed to marshal %+v: %w", e, err) } - publisher.Publish(makeEvent(string(d))) + if err := publisher.Publish(makeEvent(string(d)), r.cursorState); err != nil { + return nil, fmt.Errorf("failed to publish: %w", err) + } } default: return nil, fmt.Errorf("expected only JSON objects in the array but got a %T", v) @@ -273,3 +277,23 @@ func splitEvent(splitKey string, event map[string]interface{}) []map[string]inte return events } + +type cursorState struct { + LastCalledURL string + LastDateCursorValue string +} + +func (r *requester) updateCursorState(url, value string) { + r.cursorState.LastCalledURL = url + r.cursorState.LastDateCursorValue = value +} + +func (r *requester) loadCursor(c *cursor.Cursor, log *logp.Logger) { + if c == nil || c.IsNew() { + return + } + + if err := c.Unpack(&r.cursorState); err != nil { + log.Errorf("Reset http cursor state. Failed to read from registry: %v", err) + } +}