Skip to content

Commit

Permalink
define logger interface and add Logger field to ClientOptions (apache…
Browse files Browse the repository at this point in the history
…#323)

### Motivation
enable users to configure the logger used by the client and use their own implementation. If no logger is provided, a wrapped `logrus.StandardLogger()` will be used. <s>This PR only solved part of the problem mentioned in the issue https://github.com/apache/pulsar-client-go/issues/228.</s>

### Modifications

* define `Logger` and `Entry` interfaces used by the client
* add `Logger` field to ClientOptions
* add `logger` field to internal structures
* provide a logger implementation backed by logrus
* implement a no-op logger
  • Loading branch information
shohi authored Oct 9, 2020
1 parent 7fb79b7 commit 3ab75cd
Show file tree
Hide file tree
Showing 24 changed files with 442 additions and 135 deletions.
7 changes: 7 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/auth"
"github.com/apache/pulsar-client-go/pulsar/log"
)

func NewClient(options ClientOptions) (Client, error) {
Expand Down Expand Up @@ -102,6 +103,12 @@ type ClientOptions struct {

// Max number of connections to a single broker that will kept in the pool. (Default: 1 connection)
MaxConnectionsPerBroker int

// Configure the logger used by the client.
// By default, a wrapped logrus.StandardLogger will be used, namely,
// log.NewLoggerWithLogrus(logrus.StandardLogger())
// FIXME: use `logger` as internal field name instead of `log` as it's more idiomatic
Logger log.Logger
}

type Client interface {
Expand Down
21 changes: 16 additions & 5 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (

"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)

const (
Expand All @@ -42,16 +43,25 @@ type client struct {
rpcClient internal.RPCClient
handlers internal.ClientHandlers
lookupService internal.LookupService

log log.Logger
}

func newClient(options ClientOptions) (Client, error) {
var logger log.Logger
if options.Logger != nil {
logger = options.Logger
} else {
logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
}

if options.URL == "" {
return nil, newError(ResultInvalidConfiguration, "URL is required for client")
}

url, err := url.Parse(options.URL)
if err != nil {
log.WithError(err).Error("Failed to parse service URL")
logger.WithError(err).Error("Failed to parse service URL")
return nil, newError(ResultInvalidConfiguration, "Invalid service URL")
}

Expand Down Expand Up @@ -101,10 +111,11 @@ func newClient(options ClientOptions) (Client, error) {
}

c := &client{
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, maxConnectionsPerHost),
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, maxConnectionsPerHost, logger),
log: logger,
}
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout)
c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil)
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger)
c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil, logger)
c.handlers = internal.NewClientHandlers()
return c, nil
}
Expand Down
11 changes: 5 additions & 6 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)

var (
Expand Down Expand Up @@ -80,7 +79,7 @@ type consumer struct {
errorCh chan error
ticker *time.Ticker

log *log.Entry
log log.Logger
}

func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
Expand Down Expand Up @@ -146,11 +145,11 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}
}

dlq, err := newDlqRouter(client, options.DLQ)
dlq, err := newDlqRouter(client, options.DLQ, client.log)
if err != nil {
return nil, err
}
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable)
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, client.log)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -206,7 +205,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
errorCh: make(chan error),
dlq: dlq,
rlq: rlq,
log: log.WithField("topic", topic),
log: client.log.SubLogger(log.Fields{"topic": topic}),
consumerName: options.Name,
}

Expand Down
6 changes: 3 additions & 3 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

pkgerrors "github.com/pkg/errors"

log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pulsar/log"
)

type multiTopicConsumer struct {
Expand All @@ -42,7 +42,7 @@ type multiTopicConsumer struct {
closeOnce sync.Once
closeCh chan struct{}

log *log.Entry
log log.Logger
}

func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
Expand All @@ -54,7 +54,7 @@ func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []str
closeCh: make(chan struct{}),
dlq: dlq,
rlq: rlq,
log: log.WithField("topics", topics),
log: client.log.SubLogger(log.Fields{"topic": topics}),
consumerName: options.Name,
}

Expand Down
19 changes: 10 additions & 9 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ import (

"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)

var (
Expand Down Expand Up @@ -159,7 +158,7 @@ type partitionConsumer struct {
nackTracker *negativeAcksTracker
dlq *dlqRouter

log *log.Entry
log log.Logger

compressionProviders map[pb.CompressionType]compression.Provider
}
Expand All @@ -185,16 +184,18 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
clearQueueCh: make(chan func(id trackingMessageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
log: log.WithField("topic", options.topic),
}
pc.log = pc.log.WithField("name", pc.name).
WithField("subscription", options.subscription).
WithField("consumerID", pc.consumerID)
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
pc.log = client.log.SubLogger(log.Fields{
"name": pc.name,
"topic": options.topic,
"subscription": options.subscription,
"consumerID": pc.consumerID,
})
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, pc.log)

err := pc.grabConn()
if err != nil {
log.WithError(err).Errorf("Failed to create consumer")
pc.log.WithError(err).Error("Failed to create consumer")
return nil, err
}
pc.log.Info("Created consumer")
Expand Down
28 changes: 12 additions & 16 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import (

pkgerrors "github.com/pkg/errors"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
)

const (
Expand Down Expand Up @@ -59,7 +58,7 @@ type regexConsumer struct {

ticker *time.Ticker

log *log.Entry
log log.Logger

consumerName string
}
Expand All @@ -82,7 +81,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p

closeCh: make(chan struct{}),

log: log.WithField("topic", tn.Name),
log: c.log.SubLogger(log.Fields{"topic": tn.Name}),
consumerName: opts.Name,
}

Expand Down Expand Up @@ -280,13 +279,12 @@ func (c *regexConsumer) discover() {
newTopics := topicsDiff(topics, known)
staleTopics := topicsDiff(known, topics)

if log.GetLevel() == log.DebugLevel {
l := c.log.WithFields(log.Fields{
c.log.
WithFields(log.Fields{
"new_topics": newTopics,
"old_topics": staleTopics,
})
l.Debug("discover topics")
}
}).
Debug("discover topics")

c.unsubscribeCh <- staleTopics
c.subscribeCh <- newTopics
Expand All @@ -306,9 +304,7 @@ func (c *regexConsumer) knownTopics() []string {
}

func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter) {
if log.GetLevel() == log.DebugLevel {
c.log.WithField("topics", topics).Debug("subscribe")
}
c.log.WithField("topics", topics).Debug("subscribe")
consumers := make(map[string]Consumer, len(topics))
for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq) {
if ce.err != nil {
Expand All @@ -326,11 +322,11 @@ func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRou
}

func (c *regexConsumer) unsubscribe(topics []string) {
if log.GetLevel() == log.DebugLevel {
c.log.WithField("topics", topics).Debug("unsubscribe")
}
c.log.WithField("topics", topics).Debug("unsubscribe")

consumers := make(map[string]Consumer, len(topics))
c.consumersLock.Lock()

for _, t := range topics {
if consumer, ok := c.consumers[t]; ok {
consumers[t] = consumer
Expand All @@ -340,7 +336,7 @@ func (c *regexConsumer) unsubscribe(topics []string) {
c.consumersLock.Unlock()

for t, consumer := range consumers {
log.Debugf("unsubscribe from topic=%s subscription=%s", t, c.options.SubscriptionName)
c.log.Debugf("unsubscribe from topic=%s subscription=%s", t, c.options.SubscriptionName)
if err := consumer.Unsubscribe(); err != nil {
c.log.Warnf("unable to unsubscribe from topic=%s subscription=%s",
t, c.options.SubscriptionName)
Expand Down
9 changes: 5 additions & 4 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
)

func TestFilterTopics(t *testing.T) {
Expand Down Expand Up @@ -153,8 +154,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
AutoDiscoveryPeriod: 5 * time.Minute,
}

dlq, _ := newDlqRouter(c.(*client), nil)
rlq, _ := newRetryRouter(c.(*client), nil, false)
dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -202,8 +203,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
AutoDiscoveryPeriod: 5 * time.Minute,
}

dlq, _ := newDlqRouter(c.(*client), nil)
rlq, _ := newRetryRouter(c.(*client), nil, false)
dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
Expand Down
9 changes: 5 additions & 4 deletions pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pulsar/log"
)

type dlqRouter struct {
Expand All @@ -32,13 +32,14 @@ type dlqRouter struct {
policy *DLQPolicy
messageCh chan ConsumerMessage
closeCh chan interface{}
log *log.Entry
log log.Logger
}

func newDlqRouter(client Client, policy *DLQPolicy) (*dlqRouter, error) {
func newDlqRouter(client Client, policy *DLQPolicy, logger log.Logger) (*dlqRouter, error) {
r := &dlqRouter{
client: client,
policy: policy,
log: logger,
}

if policy != nil {
Expand All @@ -52,7 +53,7 @@ func newDlqRouter(client Client, policy *DLQPolicy) (*dlqRouter, error) {

r.messageCh = make(chan ConsumerMessage)
r.closeCh = make(chan interface{}, 1)
r.log = log.WithField("dlq-topic", policy.DeadLetterTopic)
r.log = logger.SubLogger(log.Fields{"dlq-topic": policy.DeadLetterTopic})
go r.run()
}
return r, nil
Expand Down
16 changes: 9 additions & 7 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package internal
import (
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)

const (
Expand Down Expand Up @@ -63,12 +63,14 @@ type BatchBuilder struct {

compressionProvider compression.Provider
buffersPool BuffersPool

log log.Logger
}

// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool) (*BatchBuilder, error) {
bufferPool BuffersPool, logger log.Logger) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
Expand All @@ -92,6 +94,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
callbacks: []interface{}{},
compressionProvider: getCompressionProvider(compressionType, level),
buffersPool: bufferPool,
log: logger,
}

if compressionType != pb.CompressionType_NONE {
Expand Down Expand Up @@ -162,7 +165,7 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks
// No-Op for empty batch
return nil, 0, nil
}
log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
bb.log.Debug("BatchBuilder flush: messages: ", bb.numMessages)

bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
Expand Down Expand Up @@ -198,7 +201,6 @@ func getCompressionProvider(compressionType pb.CompressionType,
case pb.CompressionType_ZSTD:
return compression.NewZStdProvider(level)
default:
log.Panic("unsupported compression type")
return nil
panic("unsupported compression type")
}
}
Loading

0 comments on commit 3ab75cd

Please sign in to comment.