diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 83f91fe2e12..75ddd6cd31b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -55,6 +55,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix counter for number of events published in `httpjson` input. {pull}31993[31993] - Fix handling of Checkpoint event for R81. {issue}32380[32380] {pull}32458[32458] +- gcp-pubsub input: Restart Pub/Sub client on all errors. {issue}32550[32550] {pull}32712[32712] *Heartbeat* diff --git a/x-pack/filebeat/input/gcppubsub/_meta/Dockerfile b/x-pack/filebeat/input/gcppubsub/_meta/Dockerfile index eea35d42374..9fa271e8838 100644 --- a/x-pack/filebeat/input/gcppubsub/_meta/Dockerfile +++ b/x-pack/filebeat/input/gcppubsub/_meta/Dockerfile @@ -28,6 +28,6 @@ RUN \ RUN \ mkdir /data -HEALTHCHECK --interval=1s --retries=90 CMD curl -s -f http://localhost:8432/ +HEALTHCHECK --interval=1s --retries=90 CMD curl -s -f --http2 http://localhost:8432/ CMD gcloud beta emulators pubsub start --data-dir /data --host-port "0.0.0.0:8432" diff --git a/x-pack/filebeat/input/gcppubsub/_meta/supported-versions.yml b/x-pack/filebeat/input/gcppubsub/_meta/supported-versions.yml index ac4ba96e699..afc1148bd83 100644 --- a/x-pack/filebeat/input/gcppubsub/_meta/supported-versions.yml +++ b/x-pack/filebeat/input/gcppubsub/_meta/supported-versions.yml @@ -1,2 +1,2 @@ variants: - - SDK_VERSION: 293.0.0-0 + - SDK_VERSION: 398.0.0-0 diff --git a/x-pack/filebeat/input/gcppubsub/docker-compose.yml b/x-pack/filebeat/input/gcppubsub/docker-compose.yml index 6afb855b8a4..60ce3ebd83f 100644 --- a/x-pack/filebeat/input/gcppubsub/docker-compose.yml +++ b/x-pack/filebeat/input/gcppubsub/docker-compose.yml @@ -2,10 +2,10 @@ version: '2.3' services: googlepubsub: - image: docker.elastic.co/integrations-ci/beats-googlepubsub:emulator-${SDK_VERSION:-293.0.0-0}-1 + image: docker.elastic.co/integrations-ci/beats-googlepubsub:emulator-${SDK_VERSION:-398.0.0-0}-1 build: context: ./_meta args: - SDK_VERSION: ${SDK_VERSION:-293.0.0-0} + SDK_VERSION: ${SDK_VERSION:-398.0.0-0} ports: - - 8432 + - '127.0.0.1:8432:8432' diff --git a/x-pack/filebeat/input/gcppubsub/input.go b/x-pack/filebeat/input/gcppubsub/input.go index 11a6a96ed2b..5a0ce6d93da 100644 --- a/x-pack/filebeat/input/gcppubsub/input.go +++ b/x-pack/filebeat/input/gcppubsub/input.go @@ -14,6 +14,7 @@ import ( "cloud.google.com/go/pubsub" "github.com/pkg/errors" + "golang.org/x/time/rate" "google.golang.org/api/option" "google.golang.org/grpc" @@ -32,6 +33,9 @@ import ( const ( inputName = "gcp-pubsub" oldInputName = "google-pubsub" + + // retryInterval is the minimum duration between pub/sub client retries. + retryInterval = 30 * time.Second ) func init() { @@ -139,9 +143,28 @@ func (in *pubsubInput) Run() { defer in.log.Info("Pub/Sub input worker has stopped.") defer in.workerWg.Done() defer in.workerCancel() - if err := in.run(); err != nil { - in.log.Error(err) - return + + // Throttle pubsub client restarts. + rt := rate.NewLimiter(rate.Every(retryInterval), 1) + + // Watchdog to keep the worker operating after an error. + for in.workerCtx.Err() == nil { + // Rate limit. + if err := rt.Wait(in.workerCtx); err != nil { + continue + } + + if err := in.run(); err != nil { + if in.workerCtx.Err() == nil { + in.log.Warnw("Restarting failed Pub/Sub input worker.", "error", err) + continue + } + + // Log any non-cancellation error before stopping. + if !errors.Is(err, context.Canceled) { + in.log.Errorw("Pub/Sub input worker failed.", "error", err) + } + } } }() })