Skip to content

Commit

Permalink
[improve] PIP-368: Support lookup based on the lookup properties (#1303)
Browse files Browse the repository at this point in the history
  • Loading branch information
crossoverJie authored Nov 1, 2024
1 parent 8c193de commit ccaf552
Show file tree
Hide file tree
Showing 9 changed files with 1,299 additions and 1,202 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ lint: bin/golangci-lint
bin/golangci-lint run

bin/golangci-lint:
GOBIN=$(shell pwd)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2
GOBIN=$(shell pwd)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.61.0

# an alternative to above `make lint` command
# use golangCi-lint docker to avoid local golang env issues
Expand Down
8 changes: 8 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ type ClientOptions struct {
// Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput.
// Config less than 0 indicates off memory limit.
MemoryLimitBytes int64

// Set the properties used for topic lookup.
// When the broker performs topic lookup, these lookup properties will be taken into consideration in a customized
// load manager.
// Note: The lookup properties are only used in topic lookup when:
// The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://"
// The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface
LookupProperties map[string]string
}

// Client represents a pulsar client
Expand Down
2 changes: 1 addition & 1 deletion pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func newClient(options ClientOptions) (Client, error) {
}

c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics,
options.ListenerName, tlsConfig, authProvider)
options.ListenerName, tlsConfig, authProvider, toKeyValues(options.LookupProperties))

c.lookupService = c.rpcClient.LookupService("")

Expand Down
54 changes: 54 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4691,3 +4691,57 @@ func TestPartitionConsumerGetLastMessageIDs(t *testing.T) {
}

}

func TestLookupConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
LookupProperties: map[string]string{
"broker.id": "1",
},
})

assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
ctx := context.Background()

// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}

// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

expectMsg := fmt.Sprintf("hello-%d", i)
assert.Equal(t, []byte(expectMsg), msg.Payload())
// ack message
consumer.Ack(msg)
}
}
6 changes: 5 additions & 1 deletion pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,21 @@ type lookupService struct {
serviceNameResolver ServiceNameResolver
tlsEnabled bool
listenerName string
lookupProperties []*pb.KeyValue
log log.Logger
metrics *Metrics
}

// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
tlsEnabled bool, listenerName string, logger log.Logger, metrics *Metrics) LookupService {
tlsEnabled bool, listenerName string,
lookupProperties []*pb.KeyValue, logger log.Logger, metrics *Metrics) LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
lookupProperties: lookupProperties,
metrics: metrics,
listenerName: listenerName,
}
Expand Down Expand Up @@ -146,6 +149,7 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
Topic: &topic,
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(ls.listenerName),
Properties: ls.lookupProperties,
})
if err != nil {
return nil, err
Expand Down
46 changes: 29 additions & 17 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -138,6 +138,7 @@ func TestLookupSuccess(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -149,9 +150,8 @@ func TestLookupSuccess(t *testing.T) {
},
},
}

metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -165,6 +165,7 @@ func TestTlsLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -174,6 +175,7 @@ func TestTlsLookupSuccess(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -187,7 +189,7 @@ func TestTlsLookupSuccess(t *testing.T) {
}
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)

ls := NewLookupService(mockedClient, url, serviceNameResolver, true, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, serviceNameResolver, true, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -201,6 +203,7 @@ func TestLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -210,6 +213,7 @@ func TestLookupWithProxy(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -223,7 +227,7 @@ func TestLookupWithProxy(t *testing.T) {
},
}
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -236,7 +240,7 @@ func TestLookupWithProxy(t *testing.T) {
func TestTlsLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -246,6 +250,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -260,7 +265,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
}
resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, true, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, true, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -273,7 +278,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
func TestLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
expectedURL: "pulsar://broker-2:6650",
Expand All @@ -284,6 +289,7 @@ func TestLookupWithRedirect(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
{
RequestId: proto.Uint64(2),
Expand All @@ -309,7 +315,7 @@ func TestLookupWithRedirect(t *testing.T) {
}
resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -322,7 +328,7 @@ func TestLookupWithRedirect(t *testing.T) {
func TestTlsLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
expectedURL: "pulsar+ssl://broker-2:6651",
Expand All @@ -333,6 +339,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
{
RequestId: proto.Uint64(2),
Expand All @@ -359,7 +366,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {

resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, true, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, true, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -372,7 +379,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
func TestLookupWithInvalidUrlResponse(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -382,6 +389,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -396,7 +404,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
}
resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand All @@ -406,7 +414,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
func TestLookupWithLookupFailure(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -416,6 +424,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -429,7 +438,7 @@ func TestLookupWithLookupFailure(t *testing.T) {

resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand Down Expand Up @@ -509,6 +518,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

kvs := make([]*pb.KeyValue, 0)
ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
t: t,

Expand All @@ -525,7 +535,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
Response: pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
},
},
}, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
}, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer))

metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
Expand All @@ -539,6 +549,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

kvs := make([]*pb.KeyValue, 0)
ls := NewLookupService(&mockedLookupRPCClient{
t: t,

Expand All @@ -548,6 +559,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
Topic: proto.String("my-topic"),
AdvertisedListenerName: proto.String(""),
Authoritative: proto.Bool(false),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -558,7 +570,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
}, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer))

lr, err := ls.Lookup("my-topic")
Expand Down
Loading

0 comments on commit ccaf552

Please sign in to comment.