Skip to content

Commit

Permalink
[Issue apache#145] fix producer cannot connect to broker through puls…
Browse files Browse the repository at this point in the history
…ar proxy (apache#146)

if producer connect to broker through pulsar proxy, producer will get Short read when reading frame size error because pulsar proxy not pass commands to broker.
  • Loading branch information
freeznet authored and wolfstudy committed Dec 31, 2019
1 parent 5004b5d commit 12a6a03
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 38 deletions.
2 changes: 1 addition & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func (pc *partitionConsumer) grabConn() error {
}

res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
pb.BaseCommand_SUBSCRIBE, cmdSubscribe, lr.ConnectingThroughProxy)

if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
Expand Down
44 changes: 24 additions & 20 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ type connection struct {
state connectionState
connectionTimeout time.Duration

logicalAddr *url.URL
physicalAddr *url.URL
cnx net.Conn
logicalAddr *url.URL
physicalAddr *url.URL
connectingThroughProxy bool
cnx net.Conn

writeBufferLock sync.Mutex
writeBuffer Buffer
Expand Down Expand Up @@ -152,20 +153,21 @@ type connection struct {
}

func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
connectionTimeout time.Duration, auth auth.Provider) *connection {
connectionTimeout time.Duration, auth auth.Provider, connectingThroughProxy bool) *connection {
cnx := &connection{
state: connectionInit,
connectionTimeout: connectionTimeout,
logicalAddr: logicalAddr,
physicalAddr: physicalAddr,
writeBuffer: NewBuffer(4096),
log: log.WithField("remote_addr", physicalAddr),
pendingReqs: make(map[uint64]*request),
lastDataReceivedTime: time.Now(),
pingTicker: time.NewTicker(keepAliveInterval),
pingCheckTicker: time.NewTicker(keepAliveInterval),
tlsOptions: tlsOptions,
auth: auth,
state: connectionInit,
connectionTimeout: connectionTimeout,
logicalAddr: logicalAddr,
physicalAddr: physicalAddr,
connectingThroughProxy: connectingThroughProxy,
writeBuffer: NewBuffer(4096),
log: log.WithField("remote_addr", physicalAddr),
pendingReqs: make(map[uint64]*request),
lastDataReceivedTime: time.Now(),
pingTicker: time.NewTicker(keepAliveInterval),
pingCheckTicker: time.NewTicker(keepAliveInterval),
tlsOptions: tlsOptions,
auth: auth,

closeCh: make(chan interface{}),
incomingRequestsCh: make(chan *request, 10),
Expand Down Expand Up @@ -248,14 +250,16 @@ func (c *connection) doHandshake() bool {
// During the initial handshake, the internal keep alive is not
// active yet, so we need to timeout write and read requests
c.cnx.SetDeadline(time.Now().Add(keepAliveInterval))

c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, &pb.CommandConnect{
cmdConnect := &pb.CommandConnect{
ProtocolVersion: &version,
ClientVersion: proto.String("Pulsar Go 0.1"),
AuthMethodName: proto.String(c.auth.Name()),
AuthData: authData,
}))

}
if c.connectingThroughProxy {
cmdConnect.ProxyToBrokerUrl = proto.String(c.logicalAddr.Host)
}
c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, cmdConnect))
cmd, _, err := c.reader.readSingleCommand()
if err != nil {
c.log.WithError(err).Warn("Failed to perform initial handshake")
Expand Down
12 changes: 7 additions & 5 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package internal

import (
"fmt"
"net/url"
"sync"
"time"
Expand All @@ -30,7 +31,7 @@ import (
// ConnectionPool is a interface of connection pool.
type ConnectionPool interface {
// GetConnection get a connection from ConnectionPool.
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, connectingThroughProxy bool) (Connection, error)

// Close all the connections in the pool
Close()
Expand All @@ -52,8 +53,9 @@ func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, connectionTim
}
}

func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
cachedCnx, found := p.pool.Load(logicalAddr.Host)
func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL,
connectingThroughProxy bool) (Connection, error) {
cachedCnx, found := p.pool.Load(fmt.Sprintf("%s:%v", logicalAddr.Host, connectingThroughProxy))
if found {
cnx := cachedCnx.(*connection)
log.Debug("Found connection in cache:", cnx.logicalAddr, cnx.physicalAddr)
Expand All @@ -68,8 +70,8 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
}

// Try to create a new connection
newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host,
newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth))
newCnx, wasCached := p.pool.LoadOrStore(fmt.Sprintf("%s:%v", logicalAddr.Host, connectingThroughProxy),
newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth, connectingThroughProxy))
cnx := newCnx.(*connection)
if !wasCached {
cnx.start()
Expand Down
12 changes: 7 additions & 5 deletions pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (

// LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
type LookupResult struct {
LogicalAddr *url.URL
PhysicalAddr *url.URL
LogicalAddr *url.URL
PhysicalAddr *url.URL
ConnectingThroughProxy bool
}

// LookupService is a interface of lookup service.
Expand Down Expand Up @@ -105,7 +106,7 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
RequestId: &id,
Topic: &topic,
Authoritative: lr.Authoritative,
})
}, false)
if err != nil {
return nil, err
}
Expand All @@ -123,8 +124,9 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
}

return &LookupResult{
LogicalAddr: logicalAddress,
PhysicalAddr: physicalAddress,
LogicalAddr: logicalAddress,
PhysicalAddr: physicalAddress,
ConnectingThroughProxy: lr.GetProxyThroughServiceUrl(),
}, nil

case pb.CommandLookupTopicResponse_Failed:
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCo
}

func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error) {
assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
expectedRequest := &c.expectedRequests[0]
c.expectedRequests = c.expectedRequests[1:]
Expand Down
8 changes: 4 additions & 4 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type RPCClient interface {
RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error)

RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message)

Expand All @@ -71,13 +71,13 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, requestTimeout time.

func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message, false)
}

func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error) {
// TODO: Add retry logic in case of connection issues
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr, connectingThroughProxy)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func (p *partitionProducer) grabCnx() error {
if len(p.options.Properties) > 0 {
cmdProducer.Metadata = toKeyValues(p.options.Properties)
}

res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer,
lr.ConnectingThroughProxy)
if err != nil {
p.log.WithError(err).Error("Failed to create producer")
return err
Expand Down

0 comments on commit 12a6a03

Please sign in to comment.