Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syslog drain app error messages in app log stream #633

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions src/cmd/syslog-agent/app/syslog_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,6 @@ func NewSyslogAgent(
m Metrics,
l *log.Logger,
) *SyslogAgent {
internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg)
writerFactory := syslog.NewWriterFactory(
internalTlsConfig,
externalTlsConfig,
syslog.NetworkTimeoutConfig{
Keepalive: 10 * time.Second,
DialTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
},
m,
)

ingressTLSConfig, err := loggregator.NewIngressTLSConfig(
cfg.GRPC.CAFile,
cfg.GRPC.CertFile,
Expand All @@ -86,12 +74,25 @@ func NewSyslogAgent(
l.Panicf("failed to create log client for syslog connector: %q", err)
}

internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg)
writerFactory := syslog.NewWriterFactory(
internalTlsConfig,
externalTlsConfig,
syslog.NetworkTimeoutConfig{
Keepalive: 10 * time.Second,
DialTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
},
m,
syslog.NewAppLogEmitter(logClient, "syslog_agent"),
)

connector := syslog.NewSyslogConnector(
cfg.DrainSkipCertVerify,
timeoutwaitgroup.New(time.Minute),
writerFactory,
m,
syslog.WithLogClient(logClient, "syslog_agent"),
syslog.WithAppLogEmitter(syslog.NewAppLogEmitter(logClient, "syslog_agent")),
)

var cacheClient *cache.CacheClient
Expand All @@ -112,6 +113,7 @@ func NewSyslogAgent(
m,
cfg.WarnOnInvalidDrains,
l,
syslog.NewAppLogEmitter(logClient, "syslog_agent"),
)
cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata)
}
Expand Down
82 changes: 82 additions & 0 deletions src/internal/testhelper/spy_log_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package testhelper

import (
"code.cloudfoundry.org/go-loggregator/v10"
v2 "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
"sync"
)

type spyLogClient struct {
mu sync.Mutex
_message []string
_appID []string

// We use maps to ensure that we can query the keys
_sourceType map[string]struct{}
_sourceInstance map[string]struct{}
}

func NewSpyLogClient() *spyLogClient {
return &spyLogClient{
_sourceType: make(map[string]struct{}),
_sourceInstance: make(map[string]struct{}),
}
}

func (s *spyLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) {
s.mu.Lock()
defer s.mu.Unlock()

env := &v2.Envelope{
Tags: make(map[string]string),
}

for _, o := range opts {
o(env)
}

s._message = append(s._message, message)
s._appID = append(s._appID, env.SourceId)
s._sourceType[env.GetTags()["source_type"]] = struct{}{}
s._sourceInstance[env.GetInstanceId()] = struct{}{}
}

func (s *spyLogClient) Message() []string {
s.mu.Lock()
defer s.mu.Unlock()

return s._message
}

func (s *spyLogClient) AppID() []string {
s.mu.Lock()
defer s.mu.Unlock()

return s._appID
}

func (s *spyLogClient) SourceType() map[string]struct{} {
s.mu.Lock()
defer s.mu.Unlock()

// Copy map so the orig does not escape the mutex and induce a race.
m := make(map[string]struct{})
for k := range s._sourceType {
m[k] = struct{}{}
}

return m
}

func (s *spyLogClient) SourceInstance() map[string]struct{} {
s.mu.Lock()
defer s.mu.Unlock()

// Copy map so the orig does not escape the mutex and induce a race.
m := make(map[string]struct{})
for k := range s._sourceInstance {
m[k] = struct{}{}
}

return m
}
40 changes: 40 additions & 0 deletions src/pkg/egress/syslog/app_log_emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package syslog

import (
"code.cloudfoundry.org/go-loggregator/v10"
)

// LogClient is used to emit logs.
type LogClient interface {
EmitLog(message string, opts ...loggregator.EmitLogOption)
}

type AppLogEmitter struct {
logClient LogClient
sourceIndex string
}

// EmitLog writes a message in the application log stream using a LogClient.
func (appLogEmitter *AppLogEmitter) EmitLog(appID string, message string) {
if appLogEmitter.logClient == nil || appID == "" {
return
}

option := loggregator.WithAppInfo(appID, "LGR", "")
appLogEmitter.logClient.EmitLog(message, option)

option = loggregator.WithAppInfo(
appID,
"SYS",
appLogEmitter.sourceIndex,
)
appLogEmitter.logClient.EmitLog(message, option)
}

// NewAppLogEmitter creates a new AppLogEmitter.
func NewAppLogEmitter(logClient LogClient, sourceIndex string) AppLogEmitter {
return AppLogEmitter{
logClient: logClient,
sourceIndex: sourceIndex,
}
}
45 changes: 45 additions & 0 deletions src/pkg/egress/syslog/app_log_emitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package syslog_test

import (
"code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Loggregator Emitter", func() {
Describe("EmitLog()", func() {
It("emits a log message", func() {
logClient := testhelper.NewSpyLogClient()
emitter := syslog.NewAppLogEmitter(logClient, "0")

emitter.EmitLog("app-id", "some-message")

messages := logClient.Message()
appIDs := logClient.AppID()
sourceTypes := logClient.SourceType()
Expect(messages).To(HaveLen(2))
Expect(messages[0]).To(Equal("some-message"))
Expect(messages[1]).To(Equal("some-message"))
Expect(appIDs[0]).To(Equal("app-id"))
Expect(appIDs[1]).To(Equal("app-id"))
Expect(sourceTypes).To(HaveKey("LGR"))
Expect(sourceTypes).To(HaveKey("SYS"))
})

It("does not emit a log message if the appID is empty", func() {
logClient := testhelper.NewSpyLogClient()
emitter := syslog.NewAppLogEmitter(logClient, "0")

emitter.EmitLog("", "some-message")

messages := logClient.Message()
appIDs := logClient.AppID()
sourceTypes := logClient.SourceType()
Expect(messages).To(HaveLen(0))
Expect(appIDs).To(HaveLen(0))
Expect(sourceTypes).ToNot(HaveKey("LGR"))
Expect(sourceTypes).ToNot(HaveKey("SYS"))
})
})
})
5 changes: 5 additions & 0 deletions src/pkg/egress/syslog/retry_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package syslog

import (
"fmt"
"log"
"math"
"time"
Expand All @@ -21,19 +22,22 @@ type RetryWriter struct {
retryDuration RetryDuration
maxRetries int
binding *URLBinding
emitter AppLogEmitter
}

func NewRetryWriter(
urlBinding *URLBinding,
retryDuration RetryDuration,
maxRetries int,
writer egress.WriteCloser,
emitter AppLogEmitter,
) (egress.WriteCloser, error) {
return &RetryWriter{
Writer: writer,
retryDuration: retryDuration,
maxRetries: maxRetries,
binding: urlBinding,
emitter: emitter,
}, nil
}

Expand All @@ -55,6 +59,7 @@ func (r *RetryWriter) Write(e *loggregator_v2.Envelope) error {

sleepDuration := r.retryDuration(i)
log.Printf(logTemplate, r.binding.URL.Host, sleepDuration, err)
r.emitter.EmitLog(e.SourceId, fmt.Sprintf(logTemplate, r.binding.URL.Host, sleepDuration, err))

time.Sleep(sleepDuration)
}
Expand Down
78 changes: 1 addition & 77 deletions src/pkg/egress/syslog/retry_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package syslog_test
import (
"errors"
"net/url"
"sync"
"sync/atomic"
"time"

"code.cloudfoundry.org/go-loggregator/v10"
v2 "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
Expand Down Expand Up @@ -165,81 +163,6 @@ func (s *spyWriteCloser) WriteAttempts() int {
return int(atomic.LoadInt64(&s.writeAttempts))
}

type spyLogClient struct {
mu sync.Mutex
_message []string
_appID []string

// We use maps to ensure that we can query the keys
_sourceType map[string]struct{}
_sourceInstance map[string]struct{}
}

func newSpyLogClient() *spyLogClient {
return &spyLogClient{
_sourceType: make(map[string]struct{}),
_sourceInstance: make(map[string]struct{}),
}
}

func (s *spyLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) {
s.mu.Lock()
defer s.mu.Unlock()

env := &v2.Envelope{
Tags: make(map[string]string),
}

for _, o := range opts {
o(env)
}

s._message = append(s._message, message)
s._appID = append(s._appID, env.SourceId)
s._sourceType[env.GetTags()["source_type"]] = struct{}{}
s._sourceInstance[env.GetInstanceId()] = struct{}{}
}

func (s *spyLogClient) message() []string {
s.mu.Lock()
defer s.mu.Unlock()

return s._message
}

func (s *spyLogClient) appID() []string {
s.mu.Lock()
defer s.mu.Unlock()

return s._appID
}

func (s *spyLogClient) sourceType() map[string]struct{} {
s.mu.Lock()
defer s.mu.Unlock()

// Copy map so the orig does not escape the mutex and induce a race.
m := make(map[string]struct{})
for k := range s._sourceType {
m[k] = struct{}{}
}

return m
}

func (s *spyLogClient) sourceInstance() map[string]struct{} {
s.mu.Lock()
defer s.mu.Unlock()

// Copy map so the orig does not escape the mutex and induce a race.
m := make(map[string]struct{})
for k := range s._sourceInstance {
m[k] = struct{}{}
}

return m
}

func buildDelay(multiplier time.Duration) func(int) time.Duration {
return func(attempt int) time.Duration {
return time.Duration(attempt) * multiplier
Expand All @@ -257,5 +180,6 @@ func buildRetryWriter(
syslog.RetryDuration(buildDelay(delayMultiplier)),
maxRetries,
w,
syslog.AppLogEmitter{},
)
}
Loading
Loading