From e270475f0fce4e9fb477becb2a04b541d4c8e3a5 Mon Sep 17 00:00:00 2001 From: Maximilian Stefanac <137301184+corporatemax@users.noreply.github.com> Date: Mon, 25 Nov 2024 09:58:22 +0100 Subject: [PATCH 1/6] add error messages from syslog drains to the app log stream --- src/cmd/syslog-agent/app/syslog_agent.go | 27 +++++----- src/pkg/egress/syslog/loggregator_emitter.go | 40 +++++++++++++++ .../egress/syslog/loggregator_emitter_test.go | 44 ++++++++++++++++ src/pkg/egress/syslog/retry_writer.go | 5 ++ src/pkg/egress/syslog/retry_writer_test.go | 3 +- src/pkg/egress/syslog/syslog_connector.go | 50 ++++--------------- .../egress/syslog/syslog_connector_test.go | 9 ++-- src/pkg/egress/syslog/writer_factory.go | 12 +++-- src/pkg/egress/syslog/writer_factory_test.go | 18 +++---- 9 files changed, 138 insertions(+), 70 deletions(-) create mode 100644 src/pkg/egress/syslog/loggregator_emitter.go create mode 100644 src/pkg/egress/syslog/loggregator_emitter_test.go diff --git a/src/cmd/syslog-agent/app/syslog_agent.go b/src/cmd/syslog-agent/app/syslog_agent.go index 277bbdeb1..eae083652 100644 --- a/src/cmd/syslog-agent/app/syslog_agent.go +++ b/src/cmd/syslog-agent/app/syslog_agent.go @@ -57,18 +57,6 @@ func NewSyslogAgent( m Metrics, l *log.Logger, ) *SyslogAgent { - internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg) - writerFactory := syslog.NewWriterFactory( - internalTlsConfig, - externalTlsConfig, - syslog.NetworkTimeoutConfig{ - Keepalive: 10 * time.Second, - DialTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - }, - m, - ) - ingressTLSConfig, err := loggregator.NewIngressTLSConfig( cfg.GRPC.CAFile, cfg.GRPC.CertFile, @@ -86,12 +74,25 @@ func NewSyslogAgent( l.Panicf("failed to create log client for syslog connector: %q", err) } + internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg) + writerFactory := syslog.NewWriterFactory( + internalTlsConfig, + externalTlsConfig, + syslog.NetworkTimeoutConfig{ + Keepalive: 10 * time.Second, + DialTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + }, + m, + syslog.NewLoggregatorEmitter(logClient, "syslog_agent"), + ) + connector := syslog.NewSyslogConnector( cfg.DrainSkipCertVerify, timeoutwaitgroup.New(time.Minute), writerFactory, m, - syslog.WithLogClient(logClient, "syslog_agent"), + syslog.WithLoggregatorEmitter(syslog.NewLoggregatorEmitter(logClient, "syslog_agent")), ) var cacheClient *cache.CacheClient diff --git a/src/pkg/egress/syslog/loggregator_emitter.go b/src/pkg/egress/syslog/loggregator_emitter.go new file mode 100644 index 000000000..63347df54 --- /dev/null +++ b/src/pkg/egress/syslog/loggregator_emitter.go @@ -0,0 +1,40 @@ +package syslog + +import ( + "code.cloudfoundry.org/go-loggregator/v10" +) + +// LogClient is used to emit logs. +type LogClient interface { + EmitLog(message string, opts ...loggregator.EmitLogOption) +} + +type LoggregatorEmitter struct { + logClient LogClient + sourceIndex string +} + +// WriteLog writes a message in the application log stream using a LogClient. +func (appLogEmitter *LoggregatorEmitter) WriteLog(appID string, message string) { + if appLogEmitter.logClient == nil || appID == "" { + return + } + + option := loggregator.WithAppInfo(appID, "LGR", "") + appLogEmitter.logClient.EmitLog(message, option) + + option = loggregator.WithAppInfo( + appID, + "SYS", + appLogEmitter.sourceIndex, + ) + appLogEmitter.logClient.EmitLog(message, option) +} + +// NewLoggregatorEmitter creates a new LoggregatorEmitter. +func NewLoggregatorEmitter(logClient LogClient, sourceIndex string) LoggregatorEmitter { + return LoggregatorEmitter{ + logClient: logClient, + sourceIndex: sourceIndex, + } +} diff --git a/src/pkg/egress/syslog/loggregator_emitter_test.go b/src/pkg/egress/syslog/loggregator_emitter_test.go new file mode 100644 index 000000000..558a68bd8 --- /dev/null +++ b/src/pkg/egress/syslog/loggregator_emitter_test.go @@ -0,0 +1,44 @@ +package syslog_test + +import ( + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Loggregator Emitter", func() { + Describe("WriteLog()", func() { + It("emits a log message", func() { + logClient := NewSpyLogClient() + emitter := syslog.NewLoggregatorEmitter(logClient, "0") + + emitter.WriteLog("app-id", "some-message") + + messages := logClient.message() + appIDs := logClient.appID() + sourceTypes := logClient.sourceType() + Expect(messages).To(HaveLen(2)) + Expect(messages[0]).To(Equal("some-message")) + Expect(messages[1]).To(Equal("some-message")) + Expect(appIDs[0]).To(Equal("app-id")) + Expect(appIDs[1]).To(Equal("app-id")) + Expect(sourceTypes).To(HaveKey("LGR")) + Expect(sourceTypes).To(HaveKey("SYS")) + }) + + It("does not emit a log message if the appID is empty", func() { + logClient := NewSpyLogClient() + emitter := syslog.NewLoggregatorEmitter(logClient, "0") + + emitter.WriteLog("", "some-message") + + messages := logClient.message() + appIDs := logClient.appID() + sourceTypes := logClient.sourceType() + Expect(messages).To(HaveLen(0)) + Expect(appIDs).To(HaveLen(0)) + Expect(sourceTypes).ToNot(HaveKey("LGR")) + Expect(sourceTypes).ToNot(HaveKey("SYS")) + }) + }) +}) diff --git a/src/pkg/egress/syslog/retry_writer.go b/src/pkg/egress/syslog/retry_writer.go index c97e81cb0..f5c31dc32 100644 --- a/src/pkg/egress/syslog/retry_writer.go +++ b/src/pkg/egress/syslog/retry_writer.go @@ -1,6 +1,7 @@ package syslog import ( + "fmt" "log" "math" "time" @@ -21,6 +22,7 @@ type RetryWriter struct { retryDuration RetryDuration maxRetries int binding *URLBinding + emitter LoggregatorEmitter } func NewRetryWriter( @@ -28,12 +30,14 @@ func NewRetryWriter( retryDuration RetryDuration, maxRetries int, writer egress.WriteCloser, + emitter LoggregatorEmitter, ) (egress.WriteCloser, error) { return &RetryWriter{ Writer: writer, retryDuration: retryDuration, maxRetries: maxRetries, binding: urlBinding, + emitter: emitter, }, nil } @@ -55,6 +59,7 @@ func (r *RetryWriter) Write(e *loggregator_v2.Envelope) error { sleepDuration := r.retryDuration(i) log.Printf(logTemplate, r.binding.URL.Host, sleepDuration, err) + r.emitter.WriteLog(e.SourceId, fmt.Sprintf(logTemplate, r.binding.URL.Host, sleepDuration, err)) time.Sleep(sleepDuration) } diff --git a/src/pkg/egress/syslog/retry_writer_test.go b/src/pkg/egress/syslog/retry_writer_test.go index 65c4752f2..64299bd37 100644 --- a/src/pkg/egress/syslog/retry_writer_test.go +++ b/src/pkg/egress/syslog/retry_writer_test.go @@ -175,7 +175,7 @@ type spyLogClient struct { _sourceInstance map[string]struct{} } -func newSpyLogClient() *spyLogClient { +func NewSpyLogClient() *spyLogClient { return &spyLogClient{ _sourceType: make(map[string]struct{}), _sourceInstance: make(map[string]struct{}), @@ -257,5 +257,6 @@ func buildRetryWriter( syslog.RetryDuration(buildDelay(delayMultiplier)), maxRetries, w, + syslog.LoggregatorEmitter{}, ) } diff --git a/src/pkg/egress/syslog/syslog_connector.go b/src/pkg/egress/syslog/syslog_connector.go index bb06b34a1..50a58858e 100644 --- a/src/pkg/egress/syslog/syslog_connector.go +++ b/src/pkg/egress/syslog/syslog_connector.go @@ -2,14 +2,12 @@ package syslog import ( "fmt" - "log" - "golang.org/x/net/context" + "log" metrics "code.cloudfoundry.org/go-metric-registry" "code.cloudfoundry.org/go-diodes" - "code.cloudfoundry.org/go-loggregator/v10" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" ) @@ -33,32 +31,20 @@ type Credentials struct { CA string `json:"ca"` } -// LogClient is used to emit logs. -type LogClient interface { - EmitLog(message string, opts ...loggregator.EmitLogOption) -} - -// nullLogClient ensures that the LogClient is in fact optional. -type nullLogClient struct{} - -// EmitLog drops all messages into /dev/null. -func (nullLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) { -} - type writerFactory interface { - NewWriter(*URLBinding) (egress.WriteCloser, error) + NewWriter(*URLBinding, LoggregatorEmitter) (egress.WriteCloser, error) } // SyslogConnector creates the various egress syslog writers. type SyslogConnector struct { skipCertVerify bool - logClient LogClient wg egress.WaitGroup - sourceIndex string writerFactory writerFactory + metricClient metricClient - metricClient metricClient droppedMetric metrics.Counter + + loggregatorEmitter LoggregatorEmitter } // NewSyslogConnector configures and returns a new SyslogConnector. @@ -78,7 +64,6 @@ func NewSyslogConnector( sc := &SyslogConnector{ skipCertVerify: skipCertVerify, wg: wg, - logClient: nullLogClient{}, writerFactory: f, metricClient: m, @@ -93,12 +78,11 @@ func NewSyslogConnector( // ConnectorOption allows a syslog connector to be customized. type ConnectorOption func(*SyslogConnector) -// WithLogClient returns a ConnectorOption that will set up logging for any +// WithLoggregatorEmitter returns a ConnectorOption that will set up logging for any // information about a binding. -func WithLogClient(logClient LogClient, sourceIndex string) ConnectorOption { +func WithLoggregatorEmitter(emitter LoggregatorEmitter) ConnectorOption { return func(sc *SyslogConnector) { - sc.logClient = logClient - sc.sourceIndex = sourceIndex + sc.loggregatorEmitter = emitter } } @@ -110,7 +94,7 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer return nil, err } - writer, err := w.writerFactory.NewWriter(urlBinding) + writer, err := w.writerFactory.NewWriter(urlBinding, w.loggregatorEmitter) if err != nil { return nil, err } @@ -138,7 +122,7 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer w.droppedMetric.Add(float64(missed)) drainDroppedMetric.Add(float64(missed)) - w.emitLoggregatorErrorLog(b.AppId, fmt.Sprintf("%d messages lost for application %s in user provided syslog drain with url %s", missed, b.AppId, anonymousUrl.String())) + w.loggregatorEmitter.WriteLog(b.AppId, fmt.Sprintf("%d messages lost for application %s in user provided syslog drain with url %s", missed, b.AppId, anonymousUrl.String())) w.emitStandardOutErrorLog(b.AppId, urlBinding.Scheme(), anonymousUrl.String(), missed) }), w.wg) @@ -151,20 +135,6 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer return filteredWriter, nil } -func (w *SyslogConnector) emitLoggregatorErrorLog(appID, message string) { - if appID == "" { - return - } - option := loggregator.WithAppInfo(appID, "LGR", "") - w.logClient.EmitLog(message, option) - - option = loggregator.WithAppInfo( - appID, - "SYS", - w.sourceIndex, - ) - w.logClient.EmitLog(message, option) -} func (w *SyslogConnector) emitStandardOutErrorLog(appID, scheme, url string, missed int) { errorAppOrAggregate := fmt.Sprintf("for %s's app drain", appID) if appID == "" { diff --git a/src/pkg/egress/syslog/syslog_connector_test.go b/src/pkg/egress/syslog/syslog_connector_test.go index cb58a740d..40b15b946 100644 --- a/src/pkg/egress/syslog/syslog_connector_test.go +++ b/src/pkg/egress/syslog/syslog_connector_test.go @@ -172,13 +172,13 @@ var _ = Describe("SyslogConnector", func() { }) It("emits a LGR and SYS log to the log client about logs that have been dropped", func() { - logClient := newSpyLogClient() + logClient := NewSpyLogClient() connector := syslog.NewSyslogConnector( true, spyWaitGroup, writerFactory, sm, - syslog.WithLogClient(logClient, "3"), + syslog.WithLoggregatorEmitter(syslog.NewLoggregatorEmitter(logClient, "3")), ) binding := syslog.Binding{AppId: "app-id", @@ -214,13 +214,13 @@ var _ = Describe("SyslogConnector", func() { }) It("doesn't emit LGR and SYS log to the log client about aggregate drains drops", func() { - logClient := newSpyLogClient() + logClient := NewSpyLogClient() connector := syslog.NewSyslogConnector( true, spyWaitGroup, writerFactory, sm, - syslog.WithLogClient(logClient, "3"), + syslog.WithLoggregatorEmitter(syslog.NewLoggregatorEmitter(logClient, "3")), ) binding := syslog.Binding{Drain: syslog.Drain{Url: "dropping://"}} @@ -276,6 +276,7 @@ type stubWriterFactory struct { func (f *stubWriterFactory) NewWriter( urlBinding *syslog.URLBinding, + emitter syslog.LoggregatorEmitter, ) (egress.WriteCloser, error) { f.called = true return f.writer, f.err diff --git a/src/pkg/egress/syslog/writer_factory.go b/src/pkg/egress/syslog/writer_factory.go index 8ec8249ab..5a43b139b 100644 --- a/src/pkg/egress/syslog/writer_factory.go +++ b/src/pkg/egress/syslog/writer_factory.go @@ -41,18 +41,20 @@ type WriterFactory struct { externalTlsConfig *tls.Config netConf NetworkTimeoutConfig m metricClient + emitter LoggregatorEmitter } -func NewWriterFactory(internalTlsConfig *tls.Config, externalTlsConfig *tls.Config, netConf NetworkTimeoutConfig, m metricClient) WriterFactory { +func NewWriterFactory(internalTlsConfig *tls.Config, externalTlsConfig *tls.Config, netConf NetworkTimeoutConfig, m metricClient, emitter LoggregatorEmitter) WriterFactory { return WriterFactory{ internalTlsConfig: internalTlsConfig, externalTlsConfig: externalTlsConfig, netConf: netConf, m: m, + emitter: emitter, } } -func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { +func (f WriterFactory) NewWriter(ub *URLBinding, emitter LoggregatorEmitter) (egress.WriteCloser, error) { tlsCfg := f.externalTlsConfig.Clone() if ub.InternalTls { tlsCfg = f.internalTlsConfig.Clone() @@ -60,7 +62,9 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { if len(ub.Certificate) > 0 && len(ub.PrivateKey) > 0 { cert, err := tls.X509KeyPair(ub.Certificate, ub.PrivateKey) if err != nil { - err = NewWriterFactoryErrorf(ub.URL, "failed to load certificate: %s", err.Error()) + errorMessage := err.Error() + err = NewWriterFactoryErrorf(ub.URL, "failed to load certificate: %s", errorMessage) + f.emitter.WriteLog(ub.AppID, fmt.Sprintf("failed to load certificate: %s", errorMessage)) return nil, err } tlsCfg.Certificates = []tls.Certificate{cert} @@ -69,6 +73,7 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { ok := tlsCfg.RootCAs.AppendCertsFromPEM(ub.CA) if !ok { err := NewWriterFactoryErrorf(ub.URL, "failed to load root CA") + f.emitter.WriteLog(ub.AppID, "failed to load root CA") return nil, err } } @@ -140,5 +145,6 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { ExponentialDuration, maxRetries, w, + emitter, ) } diff --git a/src/pkg/egress/syslog/writer_factory_test.go b/src/pkg/egress/syslog/writer_factory_test.go index 5707bbf56..3078e4b90 100644 --- a/src/pkg/egress/syslog/writer_factory_test.go +++ b/src/pkg/egress/syslog/writer_factory_test.go @@ -19,7 +19,7 @@ var _ = Describe("EgressFactory", func() { BeforeEach(func() { sm = metricsHelpers.NewMetricsRegistry() - f = syslog.NewWriterFactory(&tls.Config{}, &tls.Config{}, syslog.NetworkTimeoutConfig{}, sm) //nolint:gosec + f = syslog.NewWriterFactory(&tls.Config{}, &tls.Config{}, syslog.NetworkTimeoutConfig{}, sm, syslog.LoggregatorEmitter{}) //nolint:gosec }) Context("when the url begins with https", func() { @@ -30,7 +30,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -49,7 +49,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -68,7 +68,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -87,7 +87,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -106,7 +106,7 @@ var _ = Describe("EgressFactory", func() { Certificate: []byte("invalid-certificate"), } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) Expect(err).ToNot(HaveOccurred()) }) }) @@ -120,7 +120,7 @@ var _ = Describe("EgressFactory", func() { PrivateKey: []byte("invalid-private-key"), } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) Expect(err).ToNot(HaveOccurred()) }) }) @@ -143,7 +143,7 @@ var _ = Describe("EgressFactory", func() { urlBinding.CA = []byte("invalid-ca") } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) Expect(err).To(MatchError(expectedErr)) }, @@ -169,7 +169,7 @@ var _ = Describe("EgressFactory", func() { AppID: appID, } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) Expect(err).ToNot(HaveOccurred()) metric := sm.GetMetric("egress", tags) From 56e1903b12057369d0159538ec6eaae992689e8c Mon Sep 17 00:00:00 2001 From: Maximilian Stefanac <137301184+corporatemax@users.noreply.github.com> Date: Mon, 25 Nov 2024 10:44:52 +0100 Subject: [PATCH 2/6] add error messages from filtered binding fetcher to the app log stream --- src/cmd/syslog-agent/app/syslog_agent.go | 2 +- .../bindings/filtered_binding_fetcher.go | 10 +++- .../bindings/filtered_binding_fetcher_test.go | 50 ++++--------------- 3 files changed, 20 insertions(+), 42 deletions(-) diff --git a/src/cmd/syslog-agent/app/syslog_agent.go b/src/cmd/syslog-agent/app/syslog_agent.go index eae083652..cd2a1017c 100644 --- a/src/cmd/syslog-agent/app/syslog_agent.go +++ b/src/cmd/syslog-agent/app/syslog_agent.go @@ -113,7 +113,7 @@ func NewSyslogAgent( m, cfg.WarnOnInvalidDrains, l, - ) + syslog.NewLoggregatorEmitter(logClient, "syslog_agent")) cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata) } diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher.go b/src/pkg/ingress/bindings/filtered_binding_fetcher.go index 0921c95c3..044fa5f8e 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher.go @@ -1,6 +1,7 @@ package bindings import ( + "fmt" "log" "net" "net/url" @@ -34,9 +35,10 @@ type FilteredBindingFetcher struct { invalidDrains metrics.Gauge blacklistedDrains metrics.Gauge failedHostsCache *simplecache.SimpleCache[string, bool] + emitter syslog.LoggregatorEmitter } -func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger) *FilteredBindingFetcher { +func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger, emitter syslog.LoggregatorEmitter) *FilteredBindingFetcher { opt := metrics.WithMetricLabels(map[string]string{"unit": "total"}) invalidDrains := m.NewGauge( @@ -57,6 +59,7 @@ func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, invalidDrains: invalidDrains, blacklistedDrains: blacklistedDrains, failedHostsCache: simplecache.New[string, bool](120 * time.Second), + emitter: emitter, } } @@ -87,12 +90,14 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { if invalidScheme(u.Scheme) { f.printWarning("Invalid scheme %s in syslog drain url %s for application %s", u.Scheme, anonymousUrl.String(), b.AppId) + f.emitter.WriteLog(b.AppId, fmt.Sprintf("Invalid scheme %s in syslog drain url %s", u.Scheme, anonymousUrl.String())) continue } if len(u.Host) == 0 { invalidDrains += 1 f.printWarning("No hostname found in syslog drain url %s for application %s", anonymousUrl.String(), b.AppId) + f.emitter.WriteLog(b.AppId, fmt.Sprintf("No hostname found in syslog drain url %s", anonymousUrl.String())) continue } @@ -100,6 +105,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { if exists { invalidDrains += 1 f.printWarning("Skipped resolve ip address for syslog drain with url %s for application %s due to prior failure", anonymousUrl.String(), b.AppId) + f.emitter.WriteLog(b.AppId, fmt.Sprintf("Skipped resolve ip address for syslog drain with url %s due to prior failure", anonymousUrl.String())) continue } @@ -108,6 +114,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { invalidDrains += 1 f.failedHostsCache.Set(u.Host, true) f.printWarning("Cannot resolve ip address for syslog drain with url %s for application %s", anonymousUrl.String(), b.AppId) + f.emitter.WriteLog(b.AppId, fmt.Sprintf("Cannot resolve ip address for syslog drain with url %s", anonymousUrl.String())) continue } @@ -116,6 +123,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { invalidDrains += 1 blacklistedDrains += 1 f.printWarning("Resolved ip address for syslog drain with url %s for application %s is blacklisted", anonymousUrl.String(), b.AppId) + f.emitter.WriteLog(b.AppId, fmt.Sprintf("Resolved ip address for syslog drain with url %s is blacklisted", anonymousUrl.String())) continue } diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go b/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go index c217d4e84..15bd381a3 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go @@ -33,7 +33,7 @@ var _ = Describe("FilteredBindingFetcher", func() { } bindingReader := &SpyBindingReader{bindings: input} - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log) + filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log, syslog.LoggregatorEmitter{}) actual, err := filter.FetchBindings() Expect(err).ToNot(HaveOccurred()) @@ -43,7 +43,7 @@ var _ = Describe("FilteredBindingFetcher", func() { It("returns an error if the binding reader cannot fetch bindings", func() { bindingReader := &SpyBindingReader{nil, errors.New("Woops")} - filter := bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log) + filter := bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log, syslog.LoggregatorEmitter{}) actual, err := filter.FetchBindings() Expect(err).To(HaveOccurred()) @@ -64,13 +64,7 @@ var _ = Describe("FilteredBindingFetcher", func() { input := []syslog.Binding{ {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "://"}}, } - filter = bindings.NewFilteredBindingFetcher( - &spyIPChecker{}, - &SpyBindingReader{bindings: input}, - metrics, - warn, - log, - ) + filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{}) }) It("removes the binding", func() { @@ -108,13 +102,7 @@ var _ = Describe("FilteredBindingFetcher", func() { input := []syslog.Binding{ {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "https:///path"}}, } - filter = bindings.NewFilteredBindingFetcher( - &spyIPChecker{}, - &SpyBindingReader{bindings: input}, - metrics, - warn, - log, - ) + filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{}) }) It("removes the binding", func() { @@ -163,13 +151,7 @@ var _ = Describe("FilteredBindingFetcher", func() { }) JustBeforeEach(func() { - filter = bindings.NewFilteredBindingFetcher( - &spyIPChecker{}, - &SpyBindingReader{bindings: input}, - metrics, - warn, - log, - ) + filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{}) }) It("ignores the bindings", func() { @@ -210,13 +192,7 @@ var _ = Describe("FilteredBindingFetcher", func() { input := []syslog.Binding{ {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "syslog://some.invalid.host"}}, } - filter = bindings.NewFilteredBindingFetcher( - mockic, - &SpyBindingReader{bindings: input}, - metrics, - warn, - log, - ) + filter = bindings.NewFilteredBindingFetcher(mockic, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{}) }) It("removes bindings that failed to resolve", func() { @@ -272,16 +248,10 @@ var _ = Describe("FilteredBindingFetcher", func() { {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "syslog://some.invalid.host"}}, } - filter = bindings.NewFilteredBindingFetcher( - &spyIPChecker{ - checkBlacklistError: errors.New("blacklist error"), - resolvedIP: net.ParseIP("127.0.0.1"), - }, - &SpyBindingReader{bindings: input}, - metrics, - warn, - log, - ) + filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{ + checkBlacklistError: errors.New("blacklist error"), + resolvedIP: net.ParseIP("127.0.0.1"), + }, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{}) }) It("removes the binding", func() { From 8e9af200cb4e53161f8cd47e9f6e60c10b012a7d Mon Sep 17 00:00:00 2001 From: Maximilian Stefanac <137301184+corporatemax@users.noreply.github.com> Date: Mon, 25 Nov 2024 17:19:55 +0100 Subject: [PATCH 3/6] rename LoggregatorEmitter to AppLogEmitter --- ...ggregator_emitter.go => app_log_emitter.go} | 10 +++++----- ...emitter_test.go => app_log_emitter_test.go} | 0 src/pkg/egress/syslog/retry_writer.go | 4 ++-- src/pkg/egress/syslog/retry_writer_test.go | 2 +- src/pkg/egress/syslog/syslog_connector.go | 6 +++--- src/pkg/egress/syslog/syslog_connector_test.go | 2 +- src/pkg/egress/syslog/writer_factory.go | 6 +++--- src/pkg/egress/syslog/writer_factory_test.go | 18 +++++++++--------- .../bindings/filtered_binding_fetcher.go | 4 ++-- .../bindings/filtered_binding_fetcher_test.go | 14 +++++++------- 10 files changed, 33 insertions(+), 33 deletions(-) rename src/pkg/egress/syslog/{loggregator_emitter.go => app_log_emitter.go} (76%) rename src/pkg/egress/syslog/{loggregator_emitter_test.go => app_log_emitter_test.go} (100%) diff --git a/src/pkg/egress/syslog/loggregator_emitter.go b/src/pkg/egress/syslog/app_log_emitter.go similarity index 76% rename from src/pkg/egress/syslog/loggregator_emitter.go rename to src/pkg/egress/syslog/app_log_emitter.go index 63347df54..8101fe030 100644 --- a/src/pkg/egress/syslog/loggregator_emitter.go +++ b/src/pkg/egress/syslog/app_log_emitter.go @@ -9,13 +9,13 @@ type LogClient interface { EmitLog(message string, opts ...loggregator.EmitLogOption) } -type LoggregatorEmitter struct { +type AppLogEmitter struct { logClient LogClient sourceIndex string } // WriteLog writes a message in the application log stream using a LogClient. -func (appLogEmitter *LoggregatorEmitter) WriteLog(appID string, message string) { +func (appLogEmitter *AppLogEmitter) WriteLog(appID string, message string) { if appLogEmitter.logClient == nil || appID == "" { return } @@ -31,9 +31,9 @@ func (appLogEmitter *LoggregatorEmitter) WriteLog(appID string, message string) appLogEmitter.logClient.EmitLog(message, option) } -// NewLoggregatorEmitter creates a new LoggregatorEmitter. -func NewLoggregatorEmitter(logClient LogClient, sourceIndex string) LoggregatorEmitter { - return LoggregatorEmitter{ +// NewLoggregatorEmitter creates a new AppLogEmitter. +func NewLoggregatorEmitter(logClient LogClient, sourceIndex string) AppLogEmitter { + return AppLogEmitter{ logClient: logClient, sourceIndex: sourceIndex, } diff --git a/src/pkg/egress/syslog/loggregator_emitter_test.go b/src/pkg/egress/syslog/app_log_emitter_test.go similarity index 100% rename from src/pkg/egress/syslog/loggregator_emitter_test.go rename to src/pkg/egress/syslog/app_log_emitter_test.go diff --git a/src/pkg/egress/syslog/retry_writer.go b/src/pkg/egress/syslog/retry_writer.go index f5c31dc32..bf2b6f4df 100644 --- a/src/pkg/egress/syslog/retry_writer.go +++ b/src/pkg/egress/syslog/retry_writer.go @@ -22,7 +22,7 @@ type RetryWriter struct { retryDuration RetryDuration maxRetries int binding *URLBinding - emitter LoggregatorEmitter + emitter AppLogEmitter } func NewRetryWriter( @@ -30,7 +30,7 @@ func NewRetryWriter( retryDuration RetryDuration, maxRetries int, writer egress.WriteCloser, - emitter LoggregatorEmitter, + emitter AppLogEmitter, ) (egress.WriteCloser, error) { return &RetryWriter{ Writer: writer, diff --git a/src/pkg/egress/syslog/retry_writer_test.go b/src/pkg/egress/syslog/retry_writer_test.go index 64299bd37..89a769c38 100644 --- a/src/pkg/egress/syslog/retry_writer_test.go +++ b/src/pkg/egress/syslog/retry_writer_test.go @@ -257,6 +257,6 @@ func buildRetryWriter( syslog.RetryDuration(buildDelay(delayMultiplier)), maxRetries, w, - syslog.LoggregatorEmitter{}, + syslog.AppLogEmitter{}, ) } diff --git a/src/pkg/egress/syslog/syslog_connector.go b/src/pkg/egress/syslog/syslog_connector.go index 50a58858e..7a5b216c6 100644 --- a/src/pkg/egress/syslog/syslog_connector.go +++ b/src/pkg/egress/syslog/syslog_connector.go @@ -32,7 +32,7 @@ type Credentials struct { } type writerFactory interface { - NewWriter(*URLBinding, LoggregatorEmitter) (egress.WriteCloser, error) + NewWriter(*URLBinding, AppLogEmitter) (egress.WriteCloser, error) } // SyslogConnector creates the various egress syslog writers. @@ -44,7 +44,7 @@ type SyslogConnector struct { droppedMetric metrics.Counter - loggregatorEmitter LoggregatorEmitter + loggregatorEmitter AppLogEmitter } // NewSyslogConnector configures and returns a new SyslogConnector. @@ -80,7 +80,7 @@ type ConnectorOption func(*SyslogConnector) // WithLoggregatorEmitter returns a ConnectorOption that will set up logging for any // information about a binding. -func WithLoggregatorEmitter(emitter LoggregatorEmitter) ConnectorOption { +func WithLoggregatorEmitter(emitter AppLogEmitter) ConnectorOption { return func(sc *SyslogConnector) { sc.loggregatorEmitter = emitter } diff --git a/src/pkg/egress/syslog/syslog_connector_test.go b/src/pkg/egress/syslog/syslog_connector_test.go index 40b15b946..a97e43d15 100644 --- a/src/pkg/egress/syslog/syslog_connector_test.go +++ b/src/pkg/egress/syslog/syslog_connector_test.go @@ -276,7 +276,7 @@ type stubWriterFactory struct { func (f *stubWriterFactory) NewWriter( urlBinding *syslog.URLBinding, - emitter syslog.LoggregatorEmitter, + emitter syslog.AppLogEmitter, ) (egress.WriteCloser, error) { f.called = true return f.writer, f.err diff --git a/src/pkg/egress/syslog/writer_factory.go b/src/pkg/egress/syslog/writer_factory.go index 5a43b139b..3a0b73958 100644 --- a/src/pkg/egress/syslog/writer_factory.go +++ b/src/pkg/egress/syslog/writer_factory.go @@ -41,10 +41,10 @@ type WriterFactory struct { externalTlsConfig *tls.Config netConf NetworkTimeoutConfig m metricClient - emitter LoggregatorEmitter + emitter AppLogEmitter } -func NewWriterFactory(internalTlsConfig *tls.Config, externalTlsConfig *tls.Config, netConf NetworkTimeoutConfig, m metricClient, emitter LoggregatorEmitter) WriterFactory { +func NewWriterFactory(internalTlsConfig *tls.Config, externalTlsConfig *tls.Config, netConf NetworkTimeoutConfig, m metricClient, emitter AppLogEmitter) WriterFactory { return WriterFactory{ internalTlsConfig: internalTlsConfig, externalTlsConfig: externalTlsConfig, @@ -54,7 +54,7 @@ func NewWriterFactory(internalTlsConfig *tls.Config, externalTlsConfig *tls.Conf } } -func (f WriterFactory) NewWriter(ub *URLBinding, emitter LoggregatorEmitter) (egress.WriteCloser, error) { +func (f WriterFactory) NewWriter(ub *URLBinding, emitter AppLogEmitter) (egress.WriteCloser, error) { tlsCfg := f.externalTlsConfig.Clone() if ub.InternalTls { tlsCfg = f.internalTlsConfig.Clone() diff --git a/src/pkg/egress/syslog/writer_factory_test.go b/src/pkg/egress/syslog/writer_factory_test.go index 3078e4b90..0ae34fa82 100644 --- a/src/pkg/egress/syslog/writer_factory_test.go +++ b/src/pkg/egress/syslog/writer_factory_test.go @@ -19,7 +19,7 @@ var _ = Describe("EgressFactory", func() { BeforeEach(func() { sm = metricsHelpers.NewMetricsRegistry() - f = syslog.NewWriterFactory(&tls.Config{}, &tls.Config{}, syslog.NetworkTimeoutConfig{}, sm, syslog.LoggregatorEmitter{}) //nolint:gosec + f = syslog.NewWriterFactory(&tls.Config{}, &tls.Config{}, syslog.NetworkTimeoutConfig{}, sm, syslog.AppLogEmitter{}) //nolint:gosec }) Context("when the url begins with https", func() { @@ -30,7 +30,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) + writer, err := f.NewWriter(urlBinding, syslog.AppLogEmitter{}) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -49,7 +49,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) + writer, err := f.NewWriter(urlBinding, syslog.AppLogEmitter{}) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -68,7 +68,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) + writer, err := f.NewWriter(urlBinding, syslog.AppLogEmitter{}) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -87,7 +87,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) + writer, err := f.NewWriter(urlBinding, syslog.AppLogEmitter{}) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -106,7 +106,7 @@ var _ = Describe("EgressFactory", func() { Certificate: []byte("invalid-certificate"), } - _, err = f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) + _, err = f.NewWriter(urlBinding, syslog.AppLogEmitter{}) Expect(err).ToNot(HaveOccurred()) }) }) @@ -120,7 +120,7 @@ var _ = Describe("EgressFactory", func() { PrivateKey: []byte("invalid-private-key"), } - _, err = f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) + _, err = f.NewWriter(urlBinding, syslog.AppLogEmitter{}) Expect(err).ToNot(HaveOccurred()) }) }) @@ -143,7 +143,7 @@ var _ = Describe("EgressFactory", func() { urlBinding.CA = []byte("invalid-ca") } - _, err = f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) + _, err = f.NewWriter(urlBinding, syslog.AppLogEmitter{}) Expect(err).To(MatchError(expectedErr)) }, @@ -169,7 +169,7 @@ var _ = Describe("EgressFactory", func() { AppID: appID, } - _, err = f.NewWriter(urlBinding, syslog.LoggregatorEmitter{}) + _, err = f.NewWriter(urlBinding, syslog.AppLogEmitter{}) Expect(err).ToNot(HaveOccurred()) metric := sm.GetMetric("egress", tags) diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher.go b/src/pkg/ingress/bindings/filtered_binding_fetcher.go index 044fa5f8e..2a7a8d30d 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher.go @@ -35,10 +35,10 @@ type FilteredBindingFetcher struct { invalidDrains metrics.Gauge blacklistedDrains metrics.Gauge failedHostsCache *simplecache.SimpleCache[string, bool] - emitter syslog.LoggregatorEmitter + emitter syslog.AppLogEmitter } -func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger, emitter syslog.LoggregatorEmitter) *FilteredBindingFetcher { +func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger, emitter syslog.AppLogEmitter) *FilteredBindingFetcher { opt := metrics.WithMetricLabels(map[string]string{"unit": "total"}) invalidDrains := m.NewGauge( diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go b/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go index 15bd381a3..eb0091659 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go @@ -33,7 +33,7 @@ var _ = Describe("FilteredBindingFetcher", func() { } bindingReader := &SpyBindingReader{bindings: input} - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log, syslog.LoggregatorEmitter{}) + filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log, syslog.AppLogEmitter{}) actual, err := filter.FetchBindings() Expect(err).ToNot(HaveOccurred()) @@ -43,7 +43,7 @@ var _ = Describe("FilteredBindingFetcher", func() { It("returns an error if the binding reader cannot fetch bindings", func() { bindingReader := &SpyBindingReader{nil, errors.New("Woops")} - filter := bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log, syslog.LoggregatorEmitter{}) + filter := bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log, syslog.AppLogEmitter{}) actual, err := filter.FetchBindings() Expect(err).To(HaveOccurred()) @@ -64,7 +64,7 @@ var _ = Describe("FilteredBindingFetcher", func() { input := []syslog.Binding{ {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "://"}}, } - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{}) + filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.AppLogEmitter{}) }) It("removes the binding", func() { @@ -102,7 +102,7 @@ var _ = Describe("FilteredBindingFetcher", func() { input := []syslog.Binding{ {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "https:///path"}}, } - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{}) + filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.AppLogEmitter{}) }) It("removes the binding", func() { @@ -151,7 +151,7 @@ var _ = Describe("FilteredBindingFetcher", func() { }) JustBeforeEach(func() { - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{}) + filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.AppLogEmitter{}) }) It("ignores the bindings", func() { @@ -192,7 +192,7 @@ var _ = Describe("FilteredBindingFetcher", func() { input := []syslog.Binding{ {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "syslog://some.invalid.host"}}, } - filter = bindings.NewFilteredBindingFetcher(mockic, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{}) + filter = bindings.NewFilteredBindingFetcher(mockic, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.AppLogEmitter{}) }) It("removes bindings that failed to resolve", func() { @@ -251,7 +251,7 @@ var _ = Describe("FilteredBindingFetcher", func() { filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{ checkBlacklistError: errors.New("blacklist error"), resolvedIP: net.ParseIP("127.0.0.1"), - }, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.LoggregatorEmitter{}) + }, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.AppLogEmitter{}) }) It("removes the binding", func() { From 25b8f3dfd3cf0db68c94b88a50d0fe933398cfeb Mon Sep 17 00:00:00 2001 From: Maximilian Stefanac <137301184+corporatemax@users.noreply.github.com> Date: Mon, 25 Nov 2024 17:31:36 +0100 Subject: [PATCH 4/6] change format of NewFilteredBindingFetcher calls --- src/cmd/syslog-agent/app/syslog_agent.go | 3 +- .../bindings/filtered_binding_fetcher_test.go | 69 ++++++++++++++++--- 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/src/cmd/syslog-agent/app/syslog_agent.go b/src/cmd/syslog-agent/app/syslog_agent.go index cd2a1017c..62c12c519 100644 --- a/src/cmd/syslog-agent/app/syslog_agent.go +++ b/src/cmd/syslog-agent/app/syslog_agent.go @@ -113,7 +113,8 @@ func NewSyslogAgent( m, cfg.WarnOnInvalidDrains, l, - syslog.NewLoggregatorEmitter(logClient, "syslog_agent")) + syslog.NewLoggregatorEmitter(logClient, "syslog_agent"), + ) cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata) } diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go b/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go index eb0091659..b7c6527d7 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go @@ -33,7 +33,14 @@ var _ = Describe("FilteredBindingFetcher", func() { } bindingReader := &SpyBindingReader{bindings: input} - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log, syslog.AppLogEmitter{}) + filter = bindings.NewFilteredBindingFetcher( + &spyIPChecker{}, + bindingReader, + metrics, + true, + log, + syslog.AppLogEmitter{}, + ) actual, err := filter.FetchBindings() Expect(err).ToNot(HaveOccurred()) @@ -43,7 +50,14 @@ var _ = Describe("FilteredBindingFetcher", func() { It("returns an error if the binding reader cannot fetch bindings", func() { bindingReader := &SpyBindingReader{nil, errors.New("Woops")} - filter := bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log, syslog.AppLogEmitter{}) + filter := bindings.NewFilteredBindingFetcher( + &spyIPChecker{}, + bindingReader, + metrics, + true, + log, + syslog.AppLogEmitter{}, + ) actual, err := filter.FetchBindings() Expect(err).To(HaveOccurred()) @@ -64,7 +78,14 @@ var _ = Describe("FilteredBindingFetcher", func() { input := []syslog.Binding{ {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "://"}}, } - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.AppLogEmitter{}) + filter = bindings.NewFilteredBindingFetcher( + &spyIPChecker{}, + &SpyBindingReader{bindings: input}, + metrics, + warn, + log, + syslog.AppLogEmitter{}, + ) }) It("removes the binding", func() { @@ -102,7 +123,14 @@ var _ = Describe("FilteredBindingFetcher", func() { input := []syslog.Binding{ {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "https:///path"}}, } - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.AppLogEmitter{}) + filter = bindings.NewFilteredBindingFetcher( + &spyIPChecker{}, + &SpyBindingReader{bindings: input}, + metrics, + warn, + log, + syslog.AppLogEmitter{}, + ) }) It("removes the binding", func() { @@ -151,7 +179,14 @@ var _ = Describe("FilteredBindingFetcher", func() { }) JustBeforeEach(func() { - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.AppLogEmitter{}) + filter = bindings.NewFilteredBindingFetcher( + &spyIPChecker{}, + &SpyBindingReader{bindings: input}, + metrics, + warn, + log, + syslog.AppLogEmitter{}, + ) }) It("ignores the bindings", func() { @@ -192,7 +227,14 @@ var _ = Describe("FilteredBindingFetcher", func() { input := []syslog.Binding{ {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "syslog://some.invalid.host"}}, } - filter = bindings.NewFilteredBindingFetcher(mockic, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.AppLogEmitter{}) + filter = bindings.NewFilteredBindingFetcher( + mockic, + &SpyBindingReader{bindings: input}, + metrics, + warn, + log, + syslog.AppLogEmitter{}, + ) }) It("removes bindings that failed to resolve", func() { @@ -248,10 +290,17 @@ var _ = Describe("FilteredBindingFetcher", func() { {AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "syslog://some.invalid.host"}}, } - filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{ - checkBlacklistError: errors.New("blacklist error"), - resolvedIP: net.ParseIP("127.0.0.1"), - }, &SpyBindingReader{bindings: input}, metrics, warn, log, syslog.AppLogEmitter{}) + filter = bindings.NewFilteredBindingFetcher( + &spyIPChecker{ + checkBlacklistError: errors.New("blacklist error"), + resolvedIP: net.ParseIP("127.0.0.1"), + }, + &SpyBindingReader{bindings: input}, + metrics, + warn, + log, + syslog.AppLogEmitter{}, + ) }) It("removes the binding", func() { From 1c03a021c940340668719c68cc2d8b4886a57565 Mon Sep 17 00:00:00 2001 From: Maximilian Stefanac <137301184+corporatemax@users.noreply.github.com> Date: Mon, 25 Nov 2024 17:50:53 +0100 Subject: [PATCH 5/6] rename references to AppLogEmitter --- src/cmd/syslog-agent/app/syslog_agent.go | 6 +++--- src/pkg/egress/syslog/app_log_emitter.go | 8 ++++---- src/pkg/egress/syslog/app_log_emitter_test.go | 10 +++++----- src/pkg/egress/syslog/retry_writer.go | 2 +- src/pkg/egress/syslog/syslog_connector.go | 12 ++++++------ src/pkg/egress/syslog/syslog_connector_test.go | 4 ++-- src/pkg/egress/syslog/writer_factory.go | 4 ++-- src/pkg/ingress/bindings/filtered_binding_fetcher.go | 10 +++++----- 8 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/cmd/syslog-agent/app/syslog_agent.go b/src/cmd/syslog-agent/app/syslog_agent.go index 62c12c519..810433d36 100644 --- a/src/cmd/syslog-agent/app/syslog_agent.go +++ b/src/cmd/syslog-agent/app/syslog_agent.go @@ -84,7 +84,7 @@ func NewSyslogAgent( WriteTimeout: 10 * time.Second, }, m, - syslog.NewLoggregatorEmitter(logClient, "syslog_agent"), + syslog.NewAppLogEmitter(logClient, "syslog_agent"), ) connector := syslog.NewSyslogConnector( @@ -92,7 +92,7 @@ func NewSyslogAgent( timeoutwaitgroup.New(time.Minute), writerFactory, m, - syslog.WithLoggregatorEmitter(syslog.NewLoggregatorEmitter(logClient, "syslog_agent")), + syslog.WithAppLogEmitter(syslog.NewAppLogEmitter(logClient, "syslog_agent")), ) var cacheClient *cache.CacheClient @@ -113,7 +113,7 @@ func NewSyslogAgent( m, cfg.WarnOnInvalidDrains, l, - syslog.NewLoggregatorEmitter(logClient, "syslog_agent"), + syslog.NewAppLogEmitter(logClient, "syslog_agent"), ) cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata) } diff --git a/src/pkg/egress/syslog/app_log_emitter.go b/src/pkg/egress/syslog/app_log_emitter.go index 8101fe030..a31e17ec5 100644 --- a/src/pkg/egress/syslog/app_log_emitter.go +++ b/src/pkg/egress/syslog/app_log_emitter.go @@ -14,8 +14,8 @@ type AppLogEmitter struct { sourceIndex string } -// WriteLog writes a message in the application log stream using a LogClient. -func (appLogEmitter *AppLogEmitter) WriteLog(appID string, message string) { +// EmitLog writes a message in the application log stream using a LogClient. +func (appLogEmitter *AppLogEmitter) EmitLog(appID string, message string) { if appLogEmitter.logClient == nil || appID == "" { return } @@ -31,8 +31,8 @@ func (appLogEmitter *AppLogEmitter) WriteLog(appID string, message string) { appLogEmitter.logClient.EmitLog(message, option) } -// NewLoggregatorEmitter creates a new AppLogEmitter. -func NewLoggregatorEmitter(logClient LogClient, sourceIndex string) AppLogEmitter { +// NewAppLogEmitter creates a new AppLogEmitter. +func NewAppLogEmitter(logClient LogClient, sourceIndex string) AppLogEmitter { return AppLogEmitter{ logClient: logClient, sourceIndex: sourceIndex, diff --git a/src/pkg/egress/syslog/app_log_emitter_test.go b/src/pkg/egress/syslog/app_log_emitter_test.go index 558a68bd8..83904f80c 100644 --- a/src/pkg/egress/syslog/app_log_emitter_test.go +++ b/src/pkg/egress/syslog/app_log_emitter_test.go @@ -7,12 +7,12 @@ import ( ) var _ = Describe("Loggregator Emitter", func() { - Describe("WriteLog()", func() { + Describe("EmitLog()", func() { It("emits a log message", func() { logClient := NewSpyLogClient() - emitter := syslog.NewLoggregatorEmitter(logClient, "0") + emitter := syslog.NewAppLogEmitter(logClient, "0") - emitter.WriteLog("app-id", "some-message") + emitter.EmitLog("app-id", "some-message") messages := logClient.message() appIDs := logClient.appID() @@ -28,9 +28,9 @@ var _ = Describe("Loggregator Emitter", func() { It("does not emit a log message if the appID is empty", func() { logClient := NewSpyLogClient() - emitter := syslog.NewLoggregatorEmitter(logClient, "0") + emitter := syslog.NewAppLogEmitter(logClient, "0") - emitter.WriteLog("", "some-message") + emitter.EmitLog("", "some-message") messages := logClient.message() appIDs := logClient.appID() diff --git a/src/pkg/egress/syslog/retry_writer.go b/src/pkg/egress/syslog/retry_writer.go index bf2b6f4df..055fc0490 100644 --- a/src/pkg/egress/syslog/retry_writer.go +++ b/src/pkg/egress/syslog/retry_writer.go @@ -59,7 +59,7 @@ func (r *RetryWriter) Write(e *loggregator_v2.Envelope) error { sleepDuration := r.retryDuration(i) log.Printf(logTemplate, r.binding.URL.Host, sleepDuration, err) - r.emitter.WriteLog(e.SourceId, fmt.Sprintf(logTemplate, r.binding.URL.Host, sleepDuration, err)) + r.emitter.EmitLog(e.SourceId, fmt.Sprintf(logTemplate, r.binding.URL.Host, sleepDuration, err)) time.Sleep(sleepDuration) } diff --git a/src/pkg/egress/syslog/syslog_connector.go b/src/pkg/egress/syslog/syslog_connector.go index 7a5b216c6..066e37669 100644 --- a/src/pkg/egress/syslog/syslog_connector.go +++ b/src/pkg/egress/syslog/syslog_connector.go @@ -44,7 +44,7 @@ type SyslogConnector struct { droppedMetric metrics.Counter - loggregatorEmitter AppLogEmitter + appLogEmitter AppLogEmitter } // NewSyslogConnector configures and returns a new SyslogConnector. @@ -78,11 +78,11 @@ func NewSyslogConnector( // ConnectorOption allows a syslog connector to be customized. type ConnectorOption func(*SyslogConnector) -// WithLoggregatorEmitter returns a ConnectorOption that will set up logging for any +// WithAppLogEmitter returns a ConnectorOption that will set up logging for any // information about a binding. -func WithLoggregatorEmitter(emitter AppLogEmitter) ConnectorOption { +func WithAppLogEmitter(emitter AppLogEmitter) ConnectorOption { return func(sc *SyslogConnector) { - sc.loggregatorEmitter = emitter + sc.appLogEmitter = emitter } } @@ -94,7 +94,7 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer return nil, err } - writer, err := w.writerFactory.NewWriter(urlBinding, w.loggregatorEmitter) + writer, err := w.writerFactory.NewWriter(urlBinding, w.appLogEmitter) if err != nil { return nil, err } @@ -122,7 +122,7 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer w.droppedMetric.Add(float64(missed)) drainDroppedMetric.Add(float64(missed)) - w.loggregatorEmitter.WriteLog(b.AppId, fmt.Sprintf("%d messages lost for application %s in user provided syslog drain with url %s", missed, b.AppId, anonymousUrl.String())) + w.appLogEmitter.EmitLog(b.AppId, fmt.Sprintf("%d messages lost for application %s in user provided syslog drain with url %s", missed, b.AppId, anonymousUrl.String())) w.emitStandardOutErrorLog(b.AppId, urlBinding.Scheme(), anonymousUrl.String(), missed) }), w.wg) diff --git a/src/pkg/egress/syslog/syslog_connector_test.go b/src/pkg/egress/syslog/syslog_connector_test.go index a97e43d15..dbe6c7148 100644 --- a/src/pkg/egress/syslog/syslog_connector_test.go +++ b/src/pkg/egress/syslog/syslog_connector_test.go @@ -178,7 +178,7 @@ var _ = Describe("SyslogConnector", func() { spyWaitGroup, writerFactory, sm, - syslog.WithLoggregatorEmitter(syslog.NewLoggregatorEmitter(logClient, "3")), + syslog.WithAppLogEmitter(syslog.NewAppLogEmitter(logClient, "3")), ) binding := syslog.Binding{AppId: "app-id", @@ -220,7 +220,7 @@ var _ = Describe("SyslogConnector", func() { spyWaitGroup, writerFactory, sm, - syslog.WithLoggregatorEmitter(syslog.NewLoggregatorEmitter(logClient, "3")), + syslog.WithAppLogEmitter(syslog.NewAppLogEmitter(logClient, "3")), ) binding := syslog.Binding{Drain: syslog.Drain{Url: "dropping://"}} diff --git a/src/pkg/egress/syslog/writer_factory.go b/src/pkg/egress/syslog/writer_factory.go index 3a0b73958..4d3e0e0da 100644 --- a/src/pkg/egress/syslog/writer_factory.go +++ b/src/pkg/egress/syslog/writer_factory.go @@ -64,7 +64,7 @@ func (f WriterFactory) NewWriter(ub *URLBinding, emitter AppLogEmitter) (egress. if err != nil { errorMessage := err.Error() err = NewWriterFactoryErrorf(ub.URL, "failed to load certificate: %s", errorMessage) - f.emitter.WriteLog(ub.AppID, fmt.Sprintf("failed to load certificate: %s", errorMessage)) + f.emitter.EmitLog(ub.AppID, fmt.Sprintf("failed to load certificate: %s", errorMessage)) return nil, err } tlsCfg.Certificates = []tls.Certificate{cert} @@ -73,7 +73,7 @@ func (f WriterFactory) NewWriter(ub *URLBinding, emitter AppLogEmitter) (egress. ok := tlsCfg.RootCAs.AppendCertsFromPEM(ub.CA) if !ok { err := NewWriterFactoryErrorf(ub.URL, "failed to load root CA") - f.emitter.WriteLog(ub.AppID, "failed to load root CA") + f.emitter.EmitLog(ub.AppID, "failed to load root CA") return nil, err } } diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher.go b/src/pkg/ingress/bindings/filtered_binding_fetcher.go index 2a7a8d30d..db400de2b 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher.go @@ -90,14 +90,14 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { if invalidScheme(u.Scheme) { f.printWarning("Invalid scheme %s in syslog drain url %s for application %s", u.Scheme, anonymousUrl.String(), b.AppId) - f.emitter.WriteLog(b.AppId, fmt.Sprintf("Invalid scheme %s in syslog drain url %s", u.Scheme, anonymousUrl.String())) + f.emitter.EmitLog(b.AppId, fmt.Sprintf("Invalid scheme %s in syslog drain url %s", u.Scheme, anonymousUrl.String())) continue } if len(u.Host) == 0 { invalidDrains += 1 f.printWarning("No hostname found in syslog drain url %s for application %s", anonymousUrl.String(), b.AppId) - f.emitter.WriteLog(b.AppId, fmt.Sprintf("No hostname found in syslog drain url %s", anonymousUrl.String())) + f.emitter.EmitLog(b.AppId, fmt.Sprintf("No hostname found in syslog drain url %s", anonymousUrl.String())) continue } @@ -105,7 +105,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { if exists { invalidDrains += 1 f.printWarning("Skipped resolve ip address for syslog drain with url %s for application %s due to prior failure", anonymousUrl.String(), b.AppId) - f.emitter.WriteLog(b.AppId, fmt.Sprintf("Skipped resolve ip address for syslog drain with url %s due to prior failure", anonymousUrl.String())) + f.emitter.EmitLog(b.AppId, fmt.Sprintf("Skipped resolve ip address for syslog drain with url %s due to prior failure", anonymousUrl.String())) continue } @@ -114,7 +114,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { invalidDrains += 1 f.failedHostsCache.Set(u.Host, true) f.printWarning("Cannot resolve ip address for syslog drain with url %s for application %s", anonymousUrl.String(), b.AppId) - f.emitter.WriteLog(b.AppId, fmt.Sprintf("Cannot resolve ip address for syslog drain with url %s", anonymousUrl.String())) + f.emitter.EmitLog(b.AppId, fmt.Sprintf("Cannot resolve ip address for syslog drain with url %s", anonymousUrl.String())) continue } @@ -123,7 +123,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) { invalidDrains += 1 blacklistedDrains += 1 f.printWarning("Resolved ip address for syslog drain with url %s for application %s is blacklisted", anonymousUrl.String(), b.AppId) - f.emitter.WriteLog(b.AppId, fmt.Sprintf("Resolved ip address for syslog drain with url %s is blacklisted", anonymousUrl.String())) + f.emitter.EmitLog(b.AppId, fmt.Sprintf("Resolved ip address for syslog drain with url %s is blacklisted", anonymousUrl.String())) continue } From 9f789db548342896fb90af564c66c50d7ba3a9bf Mon Sep 17 00:00:00 2001 From: Maximilian Stefanac <137301184+corporatemax@users.noreply.github.com> Date: Wed, 27 Nov 2024 11:48:58 +0100 Subject: [PATCH 6/6] move spyLogClient to testhelper package --- src/internal/testhelper/spy_log_client.go | 82 +++++++++++++++++++ src/pkg/egress/syslog/app_log_emitter_test.go | 17 ++-- src/pkg/egress/syslog/retry_writer_test.go | 77 ----------------- .../egress/syslog/syslog_connector_test.go | 23 +++--- 4 files changed, 103 insertions(+), 96 deletions(-) create mode 100644 src/internal/testhelper/spy_log_client.go diff --git a/src/internal/testhelper/spy_log_client.go b/src/internal/testhelper/spy_log_client.go new file mode 100644 index 000000000..0faba6b91 --- /dev/null +++ b/src/internal/testhelper/spy_log_client.go @@ -0,0 +1,82 @@ +package testhelper + +import ( + "code.cloudfoundry.org/go-loggregator/v10" + v2 "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" + "sync" +) + +type spyLogClient struct { + mu sync.Mutex + _message []string + _appID []string + + // We use maps to ensure that we can query the keys + _sourceType map[string]struct{} + _sourceInstance map[string]struct{} +} + +func NewSpyLogClient() *spyLogClient { + return &spyLogClient{ + _sourceType: make(map[string]struct{}), + _sourceInstance: make(map[string]struct{}), + } +} + +func (s *spyLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) { + s.mu.Lock() + defer s.mu.Unlock() + + env := &v2.Envelope{ + Tags: make(map[string]string), + } + + for _, o := range opts { + o(env) + } + + s._message = append(s._message, message) + s._appID = append(s._appID, env.SourceId) + s._sourceType[env.GetTags()["source_type"]] = struct{}{} + s._sourceInstance[env.GetInstanceId()] = struct{}{} +} + +func (s *spyLogClient) Message() []string { + s.mu.Lock() + defer s.mu.Unlock() + + return s._message +} + +func (s *spyLogClient) AppID() []string { + s.mu.Lock() + defer s.mu.Unlock() + + return s._appID +} + +func (s *spyLogClient) SourceType() map[string]struct{} { + s.mu.Lock() + defer s.mu.Unlock() + + // Copy map so the orig does not escape the mutex and induce a race. + m := make(map[string]struct{}) + for k := range s._sourceType { + m[k] = struct{}{} + } + + return m +} + +func (s *spyLogClient) SourceInstance() map[string]struct{} { + s.mu.Lock() + defer s.mu.Unlock() + + // Copy map so the orig does not escape the mutex and induce a race. + m := make(map[string]struct{}) + for k := range s._sourceInstance { + m[k] = struct{}{} + } + + return m +} diff --git a/src/pkg/egress/syslog/app_log_emitter_test.go b/src/pkg/egress/syslog/app_log_emitter_test.go index 83904f80c..18873410d 100644 --- a/src/pkg/egress/syslog/app_log_emitter_test.go +++ b/src/pkg/egress/syslog/app_log_emitter_test.go @@ -1,6 +1,7 @@ package syslog_test import ( + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -9,14 +10,14 @@ import ( var _ = Describe("Loggregator Emitter", func() { Describe("EmitLog()", func() { It("emits a log message", func() { - logClient := NewSpyLogClient() + logClient := testhelper.NewSpyLogClient() emitter := syslog.NewAppLogEmitter(logClient, "0") emitter.EmitLog("app-id", "some-message") - messages := logClient.message() - appIDs := logClient.appID() - sourceTypes := logClient.sourceType() + messages := logClient.Message() + appIDs := logClient.AppID() + sourceTypes := logClient.SourceType() Expect(messages).To(HaveLen(2)) Expect(messages[0]).To(Equal("some-message")) Expect(messages[1]).To(Equal("some-message")) @@ -27,14 +28,14 @@ var _ = Describe("Loggregator Emitter", func() { }) It("does not emit a log message if the appID is empty", func() { - logClient := NewSpyLogClient() + logClient := testhelper.NewSpyLogClient() emitter := syslog.NewAppLogEmitter(logClient, "0") emitter.EmitLog("", "some-message") - messages := logClient.message() - appIDs := logClient.appID() - sourceTypes := logClient.sourceType() + messages := logClient.Message() + appIDs := logClient.AppID() + sourceTypes := logClient.SourceType() Expect(messages).To(HaveLen(0)) Expect(appIDs).To(HaveLen(0)) Expect(sourceTypes).ToNot(HaveKey("LGR")) diff --git a/src/pkg/egress/syslog/retry_writer_test.go b/src/pkg/egress/syslog/retry_writer_test.go index 89a769c38..03063506f 100644 --- a/src/pkg/egress/syslog/retry_writer_test.go +++ b/src/pkg/egress/syslog/retry_writer_test.go @@ -3,11 +3,9 @@ package syslog_test import ( "errors" "net/url" - "sync" "sync/atomic" "time" - "code.cloudfoundry.org/go-loggregator/v10" v2 "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" @@ -165,81 +163,6 @@ func (s *spyWriteCloser) WriteAttempts() int { return int(atomic.LoadInt64(&s.writeAttempts)) } -type spyLogClient struct { - mu sync.Mutex - _message []string - _appID []string - - // We use maps to ensure that we can query the keys - _sourceType map[string]struct{} - _sourceInstance map[string]struct{} -} - -func NewSpyLogClient() *spyLogClient { - return &spyLogClient{ - _sourceType: make(map[string]struct{}), - _sourceInstance: make(map[string]struct{}), - } -} - -func (s *spyLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) { - s.mu.Lock() - defer s.mu.Unlock() - - env := &v2.Envelope{ - Tags: make(map[string]string), - } - - for _, o := range opts { - o(env) - } - - s._message = append(s._message, message) - s._appID = append(s._appID, env.SourceId) - s._sourceType[env.GetTags()["source_type"]] = struct{}{} - s._sourceInstance[env.GetInstanceId()] = struct{}{} -} - -func (s *spyLogClient) message() []string { - s.mu.Lock() - defer s.mu.Unlock() - - return s._message -} - -func (s *spyLogClient) appID() []string { - s.mu.Lock() - defer s.mu.Unlock() - - return s._appID -} - -func (s *spyLogClient) sourceType() map[string]struct{} { - s.mu.Lock() - defer s.mu.Unlock() - - // Copy map so the orig does not escape the mutex and induce a race. - m := make(map[string]struct{}) - for k := range s._sourceType { - m[k] = struct{}{} - } - - return m -} - -func (s *spyLogClient) sourceInstance() map[string]struct{} { - s.mu.Lock() - defer s.mu.Unlock() - - // Copy map so the orig does not escape the mutex and induce a race. - m := make(map[string]struct{}) - for k := range s._sourceInstance { - m[k] = struct{}{} - } - - return m -} - func buildDelay(multiplier time.Duration) func(int) time.Duration { return func(attempt int) time.Duration { return time.Duration(attempt) * multiplier diff --git a/src/pkg/egress/syslog/syslog_connector_test.go b/src/pkg/egress/syslog/syslog_connector_test.go index dbe6c7148..46f553f01 100644 --- a/src/pkg/egress/syslog/syslog_connector_test.go +++ b/src/pkg/egress/syslog/syslog_connector_test.go @@ -1,6 +1,7 @@ package syslog_test import ( + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" "errors" "fmt" "io" @@ -172,7 +173,7 @@ var _ = Describe("SyslogConnector", func() { }) It("emits a LGR and SYS log to the log client about logs that have been dropped", func() { - logClient := NewSpyLogClient() + logClient := testhelper.NewSpyLogClient() connector := syslog.NewSyslogConnector( true, spyWaitGroup, @@ -201,20 +202,20 @@ var _ = Describe("SyslogConnector", func() { } }(writer) - Eventually(logClient.message).Should(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) - Eventually(logClient.appID).Should(ContainElement("app-id")) + Eventually(logClient.Message).Should(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) + Eventually(logClient.AppID).Should(ContainElement("app-id")) - Eventually(logClient.sourceType).Should(HaveLen(2)) - Eventually(logClient.sourceType).Should(HaveKey("LGR")) - Eventually(logClient.sourceType).Should(HaveKey("SYS")) + Eventually(logClient.SourceType).Should(HaveLen(2)) + Eventually(logClient.SourceType).Should(HaveKey("LGR")) + Eventually(logClient.SourceType).Should(HaveKey("SYS")) - Eventually(logClient.sourceInstance).Should(HaveLen(2)) - Eventually(logClient.sourceInstance).Should(HaveKey("")) - Eventually(logClient.sourceInstance).Should(HaveKey("3")) + Eventually(logClient.SourceInstance).Should(HaveLen(2)) + Eventually(logClient.SourceInstance).Should(HaveKey("")) + Eventually(logClient.SourceInstance).Should(HaveKey("3")) }) It("doesn't emit LGR and SYS log to the log client about aggregate drains drops", func() { - logClient := NewSpyLogClient() + logClient := testhelper.NewSpyLogClient() connector := syslog.NewSyslogConnector( true, spyWaitGroup, @@ -239,7 +240,7 @@ var _ = Describe("SyslogConnector", func() { } }(writer) - Consistently(logClient.message).ShouldNot(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) + Consistently(logClient.Message()).ShouldNot(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) }) It("does not panic on unknown dropped metrics", func() {