Skip to content

Commit

Permalink
add error messages from filtered binding fetcher to the app log stream
Browse files Browse the repository at this point in the history
  • Loading branch information
corporatemax committed Dec 2, 2024
1 parent e270475 commit 56e1903
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/cmd/syslog-agent/app/syslog_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewSyslogAgent(
m,
cfg.WarnOnInvalidDrains,
l,
)
syslog.NewLoggregatorEmitter(logClient, "syslog_agent"))
cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata)
}

Expand Down
10 changes: 9 additions & 1 deletion src/pkg/ingress/bindings/filtered_binding_fetcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bindings

import (
"fmt"
"log"
"net"
"net/url"
Expand Down Expand Up @@ -34,9 +35,10 @@ type FilteredBindingFetcher struct {
invalidDrains metrics.Gauge
blacklistedDrains metrics.Gauge
failedHostsCache *simplecache.SimpleCache[string, bool]
emitter syslog.LoggregatorEmitter
}

func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger) *FilteredBindingFetcher {
func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger, emitter syslog.LoggregatorEmitter) *FilteredBindingFetcher {
opt := metrics.WithMetricLabels(map[string]string{"unit": "total"})

invalidDrains := m.NewGauge(
Expand All @@ -57,6 +59,7 @@ func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient,
invalidDrains: invalidDrains,
blacklistedDrains: blacklistedDrains,
failedHostsCache: simplecache.New[string, bool](120 * time.Second),
emitter: emitter,
}
}

Expand Down Expand Up @@ -87,19 +90,22 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) {

if invalidScheme(u.Scheme) {
f.printWarning("Invalid scheme %s in syslog drain url %s for application %s", u.Scheme, anonymousUrl.String(), b.AppId)
f.emitter.WriteLog(b.AppId, fmt.Sprintf("Invalid scheme %s in syslog drain url %s", u.Scheme, anonymousUrl.String()))
continue
}

if len(u.Host) == 0 {
invalidDrains += 1
f.printWarning("No hostname found in syslog drain url %s for application %s", anonymousUrl.String(), b.AppId)
f.emitter.WriteLog(b.AppId, fmt.Sprintf("No hostname found in syslog drain url %s", anonymousUrl.String()))
continue
}

_, exists := f.failedHostsCache.Get(u.Host)
if exists {
invalidDrains += 1
f.printWarning("Skipped resolve ip address for syslog drain with url %s for application %s due to prior failure", anonymousUrl.String(), b.AppId)
f.emitter.WriteLog(b.AppId, fmt.Sprintf("Skipped resolve ip address for syslog drain with url %s due to prior failure", anonymousUrl.String()))
continue
}

Expand All @@ -108,6 +114,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) {
invalidDrains += 1
f.failedHostsCache.Set(u.Host, true)
f.printWarning("Cannot resolve ip address for syslog drain with url %s for application %s", anonymousUrl.String(), b.AppId)
f.emitter.WriteLog(b.AppId, fmt.Sprintf("Cannot resolve ip address for syslog drain with url %s", anonymousUrl.String()))
continue
}

Expand All @@ -116,6 +123,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) {
invalidDrains += 1
blacklistedDrains += 1
f.printWarning("Resolved ip address for syslog drain with url %s for application %s is blacklisted", anonymousUrl.String(), b.AppId)
f.emitter.WriteLog(b.AppId, fmt.Sprintf("Resolved ip address for syslog drain with url %s is blacklisted", anonymousUrl.String()))
continue
}

Expand Down
50 changes: 10 additions & 40 deletions src/pkg/ingress/bindings/filtered_binding_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var _ = Describe("FilteredBindingFetcher", func() {
}
bindingReader := &SpyBindingReader{bindings: input}

filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log)
filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log, syslog.LoggregatorEmitter{})
actual, err := filter.FetchBindings()

Expect(err).ToNot(HaveOccurred())
Expand All @@ -43,7 +43,7 @@ var _ = Describe("FilteredBindingFetcher", func() {
It("returns an error if the binding reader cannot fetch bindings", func() {
bindingReader := &SpyBindingReader{nil, errors.New("Woops")}

filter := bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log)
filter := bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log, syslog.LoggregatorEmitter{})
actual, err := filter.FetchBindings()

Expect(err).To(HaveOccurred())
Expand All @@ -64,13 +64,7 @@ var _ = Describe("FilteredBindingFetcher", func() {
input := []syslog.Binding{
{AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "://"}},
}
filter = bindings.NewFilteredBindingFetcher(
&spyIPChecker{},
&SpyBindingReader{bindings: input},
metrics,
warn,
log,
)
filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{})
})

It("removes the binding", func() {
Expand Down Expand Up @@ -108,13 +102,7 @@ var _ = Describe("FilteredBindingFetcher", func() {
input := []syslog.Binding{
{AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "https:///path"}},
}
filter = bindings.NewFilteredBindingFetcher(
&spyIPChecker{},
&SpyBindingReader{bindings: input},
metrics,
warn,
log,
)
filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{})
})

It("removes the binding", func() {
Expand Down Expand Up @@ -163,13 +151,7 @@ var _ = Describe("FilteredBindingFetcher", func() {
})

JustBeforeEach(func() {
filter = bindings.NewFilteredBindingFetcher(
&spyIPChecker{},
&SpyBindingReader{bindings: input},
metrics,
warn,
log,
)
filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{})
})

It("ignores the bindings", func() {
Expand Down Expand Up @@ -210,13 +192,7 @@ var _ = Describe("FilteredBindingFetcher", func() {
input := []syslog.Binding{
{AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "syslog://some.invalid.host"}},
}
filter = bindings.NewFilteredBindingFetcher(
mockic,
&SpyBindingReader{bindings: input},
metrics,
warn,
log,
)
filter = bindings.NewFilteredBindingFetcher(mockic, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{})
})

It("removes bindings that failed to resolve", func() {
Expand Down Expand Up @@ -272,16 +248,10 @@ var _ = Describe("FilteredBindingFetcher", func() {
{AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "syslog://some.invalid.host"}},
}

filter = bindings.NewFilteredBindingFetcher(
&spyIPChecker{
checkBlacklistError: errors.New("blacklist error"),
resolvedIP: net.ParseIP("127.0.0.1"),
},
&SpyBindingReader{bindings: input},
metrics,
warn,
log,
)
filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{
checkBlacklistError: errors.New("blacklist error"),
resolvedIP: net.ParseIP("127.0.0.1"),
}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{})
})

It("removes the binding", func() {
Expand Down

0 comments on commit 56e1903

Please sign in to comment.