Skip to content

Commit

Permalink
Support customization of timestamp format
Browse files Browse the repository at this point in the history
By default, RFC3339 timestamps are used, but our application uses a
custom format. This commit enables us to set the format in a consistent
manner.

Closes #131
  • Loading branch information
stanhu committed Feb 12, 2021
1 parent 912313c commit 5431b3e
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 117 deletions.
23 changes: 16 additions & 7 deletions logging/kit/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ import (

var (
defaultOptions = &options{
shouldLog: grpc_logging.DefaultDeciderMethod,
codeFunc: grpc_logging.DefaultErrorToCode,
durationFunc: DefaultDurationToField,
shouldLog: grpc_logging.DefaultDeciderMethod,
codeFunc: grpc_logging.DefaultErrorToCode,
durationFunc: DefaultDurationToField,
timestampFormat: time.RFC3339,
}
)

type options struct {
levelFunc CodeToLevel
shouldLog grpc_logging.Decider
codeFunc grpc_logging.ErrorToCode
durationFunc DurationToField
levelFunc CodeToLevel
shouldLog grpc_logging.Decider
codeFunc grpc_logging.ErrorToCode
durationFunc DurationToField
timestampFormat string
}

type Option func(*options)
Expand Down Expand Up @@ -80,6 +82,13 @@ func WithDurationField(f DurationToField) Option {
}
}

// WithTimestampFormat customizes the timestamps emitted in the log fields.
func WithTimestampFormat(format string) Option {
return func(o *options) {
o.timestampFormat = format
}
}

// DefaultCodeToLevel is the default implementation of gRPC return codes and interceptor log level for server side.
func DefaultCodeToLevel(code codes.Code, logger log.Logger) log.Logger {
switch code {
Expand Down
10 changes: 5 additions & 5 deletions logging/kit/server_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func UnaryServerInterceptor(logger log.Logger, opts ...Option) grpc.UnaryServerI
o := evaluateServerOpt(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
newCtx := injectLogger(ctx, logger, info.FullMethod, startTime)
newCtx := injectLogger(ctx, logger, info.FullMethod, startTime, o.timestampFormat)

resp, err := handler(newCtx, req)
if !o.shouldLog(info.FullMethod, err) {
Expand All @@ -44,7 +44,7 @@ func StreamServerInterceptor(logger log.Logger, opts ...Option) grpc.StreamServe
o := evaluateServerOpt(opts)
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
startTime := time.Now()
newCtx := injectLogger(stream.Context(), logger, info.FullMethod, startTime)
newCtx := injectLogger(stream.Context(), logger, info.FullMethod, startTime, o.timestampFormat)

wrapped := grpc_middleware.WrapServerStream(stream)
wrapped.WrappedContext = newCtx
Expand All @@ -61,11 +61,11 @@ func StreamServerInterceptor(logger log.Logger, opts ...Option) grpc.StreamServe
}
}

func injectLogger(ctx context.Context, logger log.Logger, fullMethodString string, start time.Time) context.Context {
func injectLogger(ctx context.Context, logger log.Logger, fullMethodString string, start time.Time, timestampFormat string) context.Context {
f := ctxkit.TagsToFields(ctx)
f = append(f, "grpc.start_time", start.Format(time.RFC3339))
f = append(f, "grpc.start_time", start.Format(timestampFormat))
if d, ok := ctx.Deadline(); ok {
f = append(f, "grpc.request.deadline", d.Format(time.RFC3339))
f = append(f, "grpc.request.deadline", d.Format(timestampFormat))
}
f = append(f, serverCallFields(fullMethodString)...)
callLog := log.With(logger, f...)
Expand Down
57 changes: 36 additions & 21 deletions logging/kit/server_interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,34 @@ func TestKitLoggingSuite(t *testing.T) {
t.Skipf("Skipping due to json.RawMessage incompatibility with go1.7")
return
}
opts := []grpc_kit.Option{
grpc_kit.WithLevels(customCodeToLevel),
}
b := newKitBaseSuite(t)
b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{
grpc_middleware.WithStreamServerChain(
grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_kit.StreamServerInterceptor(b.logger, opts...)),
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_kit.UnaryServerInterceptor(b.logger, opts...)),

for _, tcase := range []struct {
timestampFormat string
}{
{
timestampFormat: time.RFC3339,
},
{
timestampFormat: "2006-01-02",
},
} {
opts := []grpc_kit.Option{
grpc_kit.WithLevels(customCodeToLevel),
grpc_kit.WithTimestampFormat(tcase.timestampFormat),
}

b := newKitBaseSuite(t)
b.timestampFormat = tcase.timestampFormat
b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{
grpc_middleware.WithStreamServerChain(
grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_kit.StreamServerInterceptor(b.logger, opts...)),
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_kit.UnaryServerInterceptor(b.logger, opts...)),
}
suite.Run(t, &kitServerSuite{b})
}
suite.Run(t, &kitServerSuite{b})
}

type kitServerSuite struct {
Expand All @@ -69,13 +84,13 @@ func (s *kitServerSuite) TestPing_WithCustomTags() {

assert.Contains(s.T(), m, "custom_tags.int", "all lines must contain `custom_tags.int`")
require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time")
_, err := time.Parse(time.RFC3339, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time as RFC3339")
_, err := time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time")

require.Contains(s.T(), m, "grpc.request.deadline", "all lines must contain the deadline of the call")
_, err = time.Parse(time.RFC3339, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline as RFC3339")
assert.Equal(s.T(), m["grpc.request.deadline"], deadline.Format(time.RFC3339), "should have the same deadline that was set by the caller")
_, err = time.Parse(s.timestampFormat, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline")
assert.Equal(s.T(), m["grpc.request.deadline"], deadline.Format(s.timestampFormat), "should have the same deadline that was set by the caller")
}

assert.Equal(s.T(), msgs[0]["msg"], "some ping", "handler's message must contain user message")
Expand Down Expand Up @@ -129,8 +144,8 @@ func (s *kitServerSuite) TestPingError_WithCustomLevels() {
assert.Equal(s.T(), m["msg"], "finished unary call with code "+tcase.code.String(), "needs the correct end message")

require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time")
_, err = time.Parse(time.RFC3339, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time as RFC3339")
_, err = time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time")
}
}

Expand All @@ -156,8 +171,8 @@ func (s *kitServerSuite) TestPingList_WithCustomTags() {

assert.Contains(s.T(), m, "custom_tags.int", "all lines must contain `custom_tags.int` set by AddFields")
require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time")
_, err := time.Parse(time.RFC3339, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time as RFC3339")
_, err := time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time")
}

assert.Equal(s.T(), msgs[0]["msg"], "some pinglist", "handler's message must contain user message")
Expand Down
7 changes: 4 additions & 3 deletions logging/kit/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ func (s *loggingPingService) PingEmpty(ctx context.Context, empty *pb_testproto.

type kitBaseSuite struct {
*grpc_testing.InterceptorTestSuite
mutexBuffer *grpc_testing.MutexReadWriter
buffer *bytes.Buffer
logger log.Logger
mutexBuffer *grpc_testing.MutexReadWriter
buffer *bytes.Buffer
logger log.Logger
timestampFormat string
}

func newKitBaseSuite(t *testing.T) *kitBaseSuite {
Expand Down
29 changes: 19 additions & 10 deletions logging/logrus/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@ import (

var (
defaultOptions = &options{
levelFunc: nil,
shouldLog: grpc_logging.DefaultDeciderMethod,
codeFunc: grpc_logging.DefaultErrorToCode,
durationFunc: DefaultDurationToField,
messageFunc: DefaultMessageProducer,
levelFunc: nil,
shouldLog: grpc_logging.DefaultDeciderMethod,
codeFunc: grpc_logging.DefaultErrorToCode,
durationFunc: DefaultDurationToField,
messageFunc: DefaultMessageProducer,
timestampFormat: time.RFC3339,
}
)

type options struct {
levelFunc CodeToLevel
shouldLog grpc_logging.Decider
codeFunc grpc_logging.ErrorToCode
durationFunc DurationToField
messageFunc MessageProducer
levelFunc CodeToLevel
shouldLog grpc_logging.Decider
codeFunc grpc_logging.ErrorToCode
durationFunc DurationToField
messageFunc MessageProducer
timestampFormat string
}

func evaluateServerOpt(opts []Option) *options {
Expand Down Expand Up @@ -94,6 +96,13 @@ func WithMessageProducer(f MessageProducer) Option {
}
}

// WithTimestampFormat customizes the timestamps emitted in the log fields.
func WithTimestampFormat(format string) Option {
return func(o *options) {
o.timestampFormat = format
}
}

// DefaultCodeToLevel is the default implementation of gRPC return codes to log levels for server side.
func DefaultCodeToLevel(code codes.Code) logrus.Level {
switch code {
Expand Down
10 changes: 5 additions & 5 deletions logging/logrus/server_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func UnaryServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.UnaryServe
o := evaluateServerOpt(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
newCtx := newLoggerForCall(ctx, entry, info.FullMethod, startTime)
newCtx := newLoggerForCall(ctx, entry, info.FullMethod, startTime, o.timestampFormat)

resp, err := handler(newCtx, req)

Expand Down Expand Up @@ -54,7 +54,7 @@ func StreamServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.StreamSer
o := evaluateServerOpt(opts)
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
startTime := time.Now()
newCtx := newLoggerForCall(stream.Context(), entry, info.FullMethod, startTime)
newCtx := newLoggerForCall(stream.Context(), entry, info.FullMethod, startTime, o.timestampFormat)
wrapped := grpc_middleware.WrapServerStream(stream)
wrapped.WrappedContext = newCtx

Expand All @@ -76,7 +76,7 @@ func StreamServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.StreamSer
}
}

func newLoggerForCall(ctx context.Context, entry *logrus.Entry, fullMethodString string, start time.Time) context.Context {
func newLoggerForCall(ctx context.Context, entry *logrus.Entry, fullMethodString string, start time.Time, timestampFormat string) context.Context {
service := path.Dir(fullMethodString)[1:]
method := path.Base(fullMethodString)
callLog := entry.WithFields(
Expand All @@ -85,13 +85,13 @@ func newLoggerForCall(ctx context.Context, entry *logrus.Entry, fullMethodString
KindField: "server",
"grpc.service": service,
"grpc.method": method,
"grpc.start_time": start.Format(time.RFC3339),
"grpc.start_time": start.Format(timestampFormat),
})

if d, ok := ctx.Deadline(); ok {
callLog = callLog.WithFields(
logrus.Fields{
"grpc.request.deadline": d.Format(time.RFC3339),
"grpc.request.deadline": d.Format(timestampFormat),
})
}

Expand Down
62 changes: 38 additions & 24 deletions logging/logrus/server_interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,33 @@ import (
)

func TestLogrusServerSuite(t *testing.T) {
opts := []grpc_logrus.Option{
grpc_logrus.WithLevels(customCodeToLevel),
}
b := newLogrusBaseSuite(t)
b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{
grpc_middleware.WithStreamServerChain(
grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_logrus.StreamServerInterceptor(logrus.NewEntry(b.logger), opts...)),
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_logrus.UnaryServerInterceptor(logrus.NewEntry(b.logger), opts...)),
for _, tcase := range []struct {
timestampFormat string
}{
{
timestampFormat: time.RFC3339,
},
{
timestampFormat: "2006-01-02",
},
} {
opts := []grpc_logrus.Option{
grpc_logrus.WithLevels(customCodeToLevel),
grpc_logrus.WithTimestampFormat(tcase.timestampFormat),
}

b := newLogrusBaseSuite(t)
b.timestampFormat = tcase.timestampFormat
b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{
grpc_middleware.WithStreamServerChain(
grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_logrus.StreamServerInterceptor(logrus.NewEntry(b.logger), opts...)),
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_logrus.UnaryServerInterceptor(logrus.NewEntry(b.logger), opts...)),
}
suite.Run(t, &logrusServerSuite{b})
}
suite.Run(t, &logrusServerSuite{b})
}

type logrusServerSuite struct {
Expand All @@ -55,13 +69,13 @@ func (s *logrusServerSuite) TestPing_WithCustomTags() {

assert.Contains(s.T(), m, "custom_tags.int", "all lines must contain `custom_tags.int`")
require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time of the call")
_, err := time.Parse(time.RFC3339, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time as RFC3339")
_, err := time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time")

require.Contains(s.T(), m, "grpc.request.deadline", "all lines must contain the deadline of the call")
_, err = time.Parse(time.RFC3339, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline as RFC3339")
assert.Equal(s.T(), m["grpc.request.deadline"], deadline.Format(time.RFC3339), "should have the same deadline that was set by the caller")
_, err = time.Parse(s.timestampFormat, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline")
assert.Equal(s.T(), m["grpc.request.deadline"], deadline.Format(s.timestampFormat), "should have the same deadline that was set by the caller")
}

assert.Equal(s.T(), msgs[0]["msg"], "some ping", "first message must contain the correct user message")
Expand Down Expand Up @@ -114,12 +128,12 @@ func (s *logrusServerSuite) TestPingError_WithCustomLevels() {
assert.Equal(s.T(), m["msg"], "finished unary call with code "+tcase.code.String(), "must have the correct finish message")

require.Contains(s.T(), m, "grpc.start_time", "all lines must contain a start time for the call")
_, err = time.Parse(time.RFC3339, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse the start time as RFC3339")
_, err = time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse the start time")

require.Contains(s.T(), m, "grpc.request.deadline", "all lines must contain the deadline of the call")
_, err = time.Parse(time.RFC3339, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline as RFC3339")
_, err = time.Parse(s.timestampFormat, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline")
}
}

Expand All @@ -145,12 +159,12 @@ func (s *logrusServerSuite) TestPingList_WithCustomTags() {

assert.Contains(s.T(), m, "custom_tags.int", "all lines must contain `custom_tags.int`")
require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time for the call")
_, err := time.Parse(time.RFC3339, m["grpc.start_time"].(string))
_, err := time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time as RFC3339")

require.Contains(s.T(), m, "grpc.request.deadline", "all lines must contain the deadline of the call")
_, err = time.Parse(time.RFC3339, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline as RFC3339")
_, err = time.Parse(s.timestampFormat, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline")
}

assert.Equal(s.T(), msgs[0]["msg"], "some pinglist", "msg must be the correct message")
Expand Down
7 changes: 4 additions & 3 deletions logging/logrus/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ func (s *loggingPingService) PingEmpty(ctx context.Context, empty *pb_testproto.

type logrusBaseSuite struct {
*grpc_testing.InterceptorTestSuite
mutexBuffer *grpc_testing.MutexReadWriter
buffer *bytes.Buffer
logger *logrus.Logger
mutexBuffer *grpc_testing.MutexReadWriter
buffer *bytes.Buffer
logger *logrus.Logger
timestampFormat string
}

func newLogrusBaseSuite(t *testing.T) *logrusBaseSuite {
Expand Down
Loading

0 comments on commit 5431b3e

Please sign in to comment.