Skip to content

Commit

Permalink
Support customization of timestamp format (#398)
Browse files Browse the repository at this point in the history
By default, RFC3339 timestamps are used, but our application uses a
custom format. This commit enables us to set the format in a consistent
manner.

Closes #131
  • Loading branch information
stanhu authored Feb 13, 2021
1 parent 912313c commit be4c235
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 117 deletions.
23 changes: 16 additions & 7 deletions logging/kit/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ import (

var (
defaultOptions = &options{
shouldLog: grpc_logging.DefaultDeciderMethod,
codeFunc: grpc_logging.DefaultErrorToCode,
durationFunc: DefaultDurationToField,
shouldLog: grpc_logging.DefaultDeciderMethod,
codeFunc: grpc_logging.DefaultErrorToCode,
durationFunc: DefaultDurationToField,
timestampFormat: time.RFC3339,
}
)

type options struct {
levelFunc CodeToLevel
shouldLog grpc_logging.Decider
codeFunc grpc_logging.ErrorToCode
durationFunc DurationToField
levelFunc CodeToLevel
shouldLog grpc_logging.Decider
codeFunc grpc_logging.ErrorToCode
durationFunc DurationToField
timestampFormat string
}

type Option func(*options)
Expand Down Expand Up @@ -80,6 +82,13 @@ func WithDurationField(f DurationToField) Option {
}
}

// WithTimestampFormat customizes the timestamps emitted in the log fields.
func WithTimestampFormat(format string) Option {
return func(o *options) {
o.timestampFormat = format
}
}

// DefaultCodeToLevel is the default implementation of gRPC return codes and interceptor log level for server side.
func DefaultCodeToLevel(code codes.Code, logger log.Logger) log.Logger {
switch code {
Expand Down
10 changes: 5 additions & 5 deletions logging/kit/server_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func UnaryServerInterceptor(logger log.Logger, opts ...Option) grpc.UnaryServerI
o := evaluateServerOpt(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
newCtx := injectLogger(ctx, logger, info.FullMethod, startTime)
newCtx := injectLogger(ctx, logger, info.FullMethod, startTime, o.timestampFormat)

resp, err := handler(newCtx, req)
if !o.shouldLog(info.FullMethod, err) {
Expand All @@ -44,7 +44,7 @@ func StreamServerInterceptor(logger log.Logger, opts ...Option) grpc.StreamServe
o := evaluateServerOpt(opts)
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
startTime := time.Now()
newCtx := injectLogger(stream.Context(), logger, info.FullMethod, startTime)
newCtx := injectLogger(stream.Context(), logger, info.FullMethod, startTime, o.timestampFormat)

wrapped := grpc_middleware.WrapServerStream(stream)
wrapped.WrappedContext = newCtx
Expand All @@ -61,11 +61,11 @@ func StreamServerInterceptor(logger log.Logger, opts ...Option) grpc.StreamServe
}
}

func injectLogger(ctx context.Context, logger log.Logger, fullMethodString string, start time.Time) context.Context {
func injectLogger(ctx context.Context, logger log.Logger, fullMethodString string, start time.Time, timestampFormat string) context.Context {
f := ctxkit.TagsToFields(ctx)
f = append(f, "grpc.start_time", start.Format(time.RFC3339))
f = append(f, "grpc.start_time", start.Format(timestampFormat))
if d, ok := ctx.Deadline(); ok {
f = append(f, "grpc.request.deadline", d.Format(time.RFC3339))
f = append(f, "grpc.request.deadline", d.Format(timestampFormat))
}
f = append(f, serverCallFields(fullMethodString)...)
callLog := log.With(logger, f...)
Expand Down
57 changes: 36 additions & 21 deletions logging/kit/server_interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,34 @@ func TestKitLoggingSuite(t *testing.T) {
t.Skipf("Skipping due to json.RawMessage incompatibility with go1.7")
return
}
opts := []grpc_kit.Option{
grpc_kit.WithLevels(customCodeToLevel),
}
b := newKitBaseSuite(t)
b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{
grpc_middleware.WithStreamServerChain(
grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_kit.StreamServerInterceptor(b.logger, opts...)),
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_kit.UnaryServerInterceptor(b.logger, opts...)),

for _, tcase := range []struct {
timestampFormat string
}{
{
timestampFormat: time.RFC3339,
},
{
timestampFormat: "2006-01-02",
},
} {
opts := []grpc_kit.Option{
grpc_kit.WithLevels(customCodeToLevel),
grpc_kit.WithTimestampFormat(tcase.timestampFormat),
}

b := newKitBaseSuite(t)
b.timestampFormat = tcase.timestampFormat
b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{
grpc_middleware.WithStreamServerChain(
grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_kit.StreamServerInterceptor(b.logger, opts...)),
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_kit.UnaryServerInterceptor(b.logger, opts...)),
}
suite.Run(t, &kitServerSuite{b})
}
suite.Run(t, &kitServerSuite{b})
}

type kitServerSuite struct {
Expand All @@ -69,13 +84,13 @@ func (s *kitServerSuite) TestPing_WithCustomTags() {

assert.Contains(s.T(), m, "custom_tags.int", "all lines must contain `custom_tags.int`")
require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time")
_, err := time.Parse(time.RFC3339, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time as RFC3339")
_, err := time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time")

require.Contains(s.T(), m, "grpc.request.deadline", "all lines must contain the deadline of the call")
_, err = time.Parse(time.RFC3339, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline as RFC3339")
assert.Equal(s.T(), m["grpc.request.deadline"], deadline.Format(time.RFC3339), "should have the same deadline that was set by the caller")
_, err = time.Parse(s.timestampFormat, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline")
assert.Equal(s.T(), m["grpc.request.deadline"], deadline.Format(s.timestampFormat), "should have the same deadline that was set by the caller")
}

assert.Equal(s.T(), msgs[0]["msg"], "some ping", "handler's message must contain user message")
Expand Down Expand Up @@ -129,8 +144,8 @@ func (s *kitServerSuite) TestPingError_WithCustomLevels() {
assert.Equal(s.T(), m["msg"], "finished unary call with code "+tcase.code.String(), "needs the correct end message")

require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time")
_, err = time.Parse(time.RFC3339, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time as RFC3339")
_, err = time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time")
}
}

Expand All @@ -156,8 +171,8 @@ func (s *kitServerSuite) TestPingList_WithCustomTags() {

assert.Contains(s.T(), m, "custom_tags.int", "all lines must contain `custom_tags.int` set by AddFields")
require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time")
_, err := time.Parse(time.RFC3339, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time as RFC3339")
_, err := time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time")
}

assert.Equal(s.T(), msgs[0]["msg"], "some pinglist", "handler's message must contain user message")
Expand Down
7 changes: 4 additions & 3 deletions logging/kit/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ func (s *loggingPingService) PingEmpty(ctx context.Context, empty *pb_testproto.

type kitBaseSuite struct {
*grpc_testing.InterceptorTestSuite
mutexBuffer *grpc_testing.MutexReadWriter
buffer *bytes.Buffer
logger log.Logger
mutexBuffer *grpc_testing.MutexReadWriter
buffer *bytes.Buffer
logger log.Logger
timestampFormat string
}

func newKitBaseSuite(t *testing.T) *kitBaseSuite {
Expand Down
29 changes: 19 additions & 10 deletions logging/logrus/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@ import (

var (
defaultOptions = &options{
levelFunc: nil,
shouldLog: grpc_logging.DefaultDeciderMethod,
codeFunc: grpc_logging.DefaultErrorToCode,
durationFunc: DefaultDurationToField,
messageFunc: DefaultMessageProducer,
levelFunc: nil,
shouldLog: grpc_logging.DefaultDeciderMethod,
codeFunc: grpc_logging.DefaultErrorToCode,
durationFunc: DefaultDurationToField,
messageFunc: DefaultMessageProducer,
timestampFormat: time.RFC3339,
}
)

type options struct {
levelFunc CodeToLevel
shouldLog grpc_logging.Decider
codeFunc grpc_logging.ErrorToCode
durationFunc DurationToField
messageFunc MessageProducer
levelFunc CodeToLevel
shouldLog grpc_logging.Decider
codeFunc grpc_logging.ErrorToCode
durationFunc DurationToField
messageFunc MessageProducer
timestampFormat string
}

func evaluateServerOpt(opts []Option) *options {
Expand Down Expand Up @@ -94,6 +96,13 @@ func WithMessageProducer(f MessageProducer) Option {
}
}

// WithTimestampFormat customizes the timestamps emitted in the log fields.
func WithTimestampFormat(format string) Option {
return func(o *options) {
o.timestampFormat = format
}
}

// DefaultCodeToLevel is the default implementation of gRPC return codes to log levels for server side.
func DefaultCodeToLevel(code codes.Code) logrus.Level {
switch code {
Expand Down
10 changes: 5 additions & 5 deletions logging/logrus/server_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func UnaryServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.UnaryServe
o := evaluateServerOpt(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
newCtx := newLoggerForCall(ctx, entry, info.FullMethod, startTime)
newCtx := newLoggerForCall(ctx, entry, info.FullMethod, startTime, o.timestampFormat)

resp, err := handler(newCtx, req)

Expand Down Expand Up @@ -54,7 +54,7 @@ func StreamServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.StreamSer
o := evaluateServerOpt(opts)
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
startTime := time.Now()
newCtx := newLoggerForCall(stream.Context(), entry, info.FullMethod, startTime)
newCtx := newLoggerForCall(stream.Context(), entry, info.FullMethod, startTime, o.timestampFormat)
wrapped := grpc_middleware.WrapServerStream(stream)
wrapped.WrappedContext = newCtx

Expand All @@ -76,7 +76,7 @@ func StreamServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.StreamSer
}
}

func newLoggerForCall(ctx context.Context, entry *logrus.Entry, fullMethodString string, start time.Time) context.Context {
func newLoggerForCall(ctx context.Context, entry *logrus.Entry, fullMethodString string, start time.Time, timestampFormat string) context.Context {
service := path.Dir(fullMethodString)[1:]
method := path.Base(fullMethodString)
callLog := entry.WithFields(
Expand All @@ -85,13 +85,13 @@ func newLoggerForCall(ctx context.Context, entry *logrus.Entry, fullMethodString
KindField: "server",
"grpc.service": service,
"grpc.method": method,
"grpc.start_time": start.Format(time.RFC3339),
"grpc.start_time": start.Format(timestampFormat),
})

if d, ok := ctx.Deadline(); ok {
callLog = callLog.WithFields(
logrus.Fields{
"grpc.request.deadline": d.Format(time.RFC3339),
"grpc.request.deadline": d.Format(timestampFormat),
})
}

Expand Down
62 changes: 38 additions & 24 deletions logging/logrus/server_interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,33 @@ import (
)

func TestLogrusServerSuite(t *testing.T) {
opts := []grpc_logrus.Option{
grpc_logrus.WithLevels(customCodeToLevel),
}
b := newLogrusBaseSuite(t)
b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{
grpc_middleware.WithStreamServerChain(
grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_logrus.StreamServerInterceptor(logrus.NewEntry(b.logger), opts...)),
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_logrus.UnaryServerInterceptor(logrus.NewEntry(b.logger), opts...)),
for _, tcase := range []struct {
timestampFormat string
}{
{
timestampFormat: time.RFC3339,
},
{
timestampFormat: "2006-01-02",
},
} {
opts := []grpc_logrus.Option{
grpc_logrus.WithLevels(customCodeToLevel),
grpc_logrus.WithTimestampFormat(tcase.timestampFormat),
}

b := newLogrusBaseSuite(t)
b.timestampFormat = tcase.timestampFormat
b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{
grpc_middleware.WithStreamServerChain(
grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_logrus.StreamServerInterceptor(logrus.NewEntry(b.logger), opts...)),
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_logrus.UnaryServerInterceptor(logrus.NewEntry(b.logger), opts...)),
}
suite.Run(t, &logrusServerSuite{b})
}
suite.Run(t, &logrusServerSuite{b})
}

type logrusServerSuite struct {
Expand All @@ -55,13 +69,13 @@ func (s *logrusServerSuite) TestPing_WithCustomTags() {

assert.Contains(s.T(), m, "custom_tags.int", "all lines must contain `custom_tags.int`")
require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time of the call")
_, err := time.Parse(time.RFC3339, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time as RFC3339")
_, err := time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time")

require.Contains(s.T(), m, "grpc.request.deadline", "all lines must contain the deadline of the call")
_, err = time.Parse(time.RFC3339, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline as RFC3339")
assert.Equal(s.T(), m["grpc.request.deadline"], deadline.Format(time.RFC3339), "should have the same deadline that was set by the caller")
_, err = time.Parse(s.timestampFormat, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline")
assert.Equal(s.T(), m["grpc.request.deadline"], deadline.Format(s.timestampFormat), "should have the same deadline that was set by the caller")
}

assert.Equal(s.T(), msgs[0]["msg"], "some ping", "first message must contain the correct user message")
Expand Down Expand Up @@ -114,12 +128,12 @@ func (s *logrusServerSuite) TestPingError_WithCustomLevels() {
assert.Equal(s.T(), m["msg"], "finished unary call with code "+tcase.code.String(), "must have the correct finish message")

require.Contains(s.T(), m, "grpc.start_time", "all lines must contain a start time for the call")
_, err = time.Parse(time.RFC3339, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse the start time as RFC3339")
_, err = time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse the start time")

require.Contains(s.T(), m, "grpc.request.deadline", "all lines must contain the deadline of the call")
_, err = time.Parse(time.RFC3339, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline as RFC3339")
_, err = time.Parse(s.timestampFormat, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline")
}
}

Expand All @@ -145,12 +159,12 @@ func (s *logrusServerSuite) TestPingList_WithCustomTags() {

assert.Contains(s.T(), m, "custom_tags.int", "all lines must contain `custom_tags.int`")
require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time for the call")
_, err := time.Parse(time.RFC3339, m["grpc.start_time"].(string))
_, err := time.Parse(s.timestampFormat, m["grpc.start_time"].(string))
assert.NoError(s.T(), err, "should be able to parse start time as RFC3339")

require.Contains(s.T(), m, "grpc.request.deadline", "all lines must contain the deadline of the call")
_, err = time.Parse(time.RFC3339, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline as RFC3339")
_, err = time.Parse(s.timestampFormat, m["grpc.request.deadline"].(string))
require.NoError(s.T(), err, "should be able to parse deadline")
}

assert.Equal(s.T(), msgs[0]["msg"], "some pinglist", "msg must be the correct message")
Expand Down
7 changes: 4 additions & 3 deletions logging/logrus/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ func (s *loggingPingService) PingEmpty(ctx context.Context, empty *pb_testproto.

type logrusBaseSuite struct {
*grpc_testing.InterceptorTestSuite
mutexBuffer *grpc_testing.MutexReadWriter
buffer *bytes.Buffer
logger *logrus.Logger
mutexBuffer *grpc_testing.MutexReadWriter
buffer *bytes.Buffer
logger *logrus.Logger
timestampFormat string
}

func newLogrusBaseSuite(t *testing.T) *logrusBaseSuite {
Expand Down
Loading

0 comments on commit be4c235

Please sign in to comment.