From 12a6a03e9c31fa6892f4219e5850b5d7d298e889 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 31 Dec 2019 13:01:38 +0800 Subject: [PATCH] [Issue #145] fix producer cannot connect to broker through pulsar proxy (#146) if producer connect to broker through pulsar proxy, producer will get Short read when reading frame size error because pulsar proxy not pass commands to broker. --- pulsar/consumer_partition.go | 2 +- pulsar/internal/connection.go | 44 ++++++++++++++------------ pulsar/internal/connection_pool.go | 12 ++++--- pulsar/internal/lookup_service.go | 12 ++++--- pulsar/internal/lookup_service_test.go | 2 +- pulsar/internal/rpc_client.go | 8 ++--- pulsar/producer_partition.go | 4 +-- 7 files changed, 46 insertions(+), 38 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 55d13183c0c63..9d47581e2d5aa 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -530,7 +530,7 @@ func (pc *partitionConsumer) grabConn() error { } res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID, - pb.BaseCommand_SUBSCRIBE, cmdSubscribe) + pb.BaseCommand_SUBSCRIBE, cmdSubscribe, lr.ConnectingThroughProxy) if err != nil { pc.log.WithError(err).Error("Failed to create consumer") diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 7a7cd5f5fb84b..590b92f2eabe5 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -119,9 +119,10 @@ type connection struct { state connectionState connectionTimeout time.Duration - logicalAddr *url.URL - physicalAddr *url.URL - cnx net.Conn + logicalAddr *url.URL + physicalAddr *url.URL + connectingThroughProxy bool + cnx net.Conn writeBufferLock sync.Mutex writeBuffer Buffer @@ -152,20 +153,21 @@ type connection struct { } func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions, - connectionTimeout time.Duration, auth auth.Provider) *connection { + connectionTimeout time.Duration, auth auth.Provider, connectingThroughProxy bool) *connection { cnx := &connection{ - state: connectionInit, - connectionTimeout: connectionTimeout, - logicalAddr: logicalAddr, - physicalAddr: physicalAddr, - writeBuffer: NewBuffer(4096), - log: log.WithField("remote_addr", physicalAddr), - pendingReqs: make(map[uint64]*request), - lastDataReceivedTime: time.Now(), - pingTicker: time.NewTicker(keepAliveInterval), - pingCheckTicker: time.NewTicker(keepAliveInterval), - tlsOptions: tlsOptions, - auth: auth, + state: connectionInit, + connectionTimeout: connectionTimeout, + logicalAddr: logicalAddr, + physicalAddr: physicalAddr, + connectingThroughProxy: connectingThroughProxy, + writeBuffer: NewBuffer(4096), + log: log.WithField("remote_addr", physicalAddr), + pendingReqs: make(map[uint64]*request), + lastDataReceivedTime: time.Now(), + pingTicker: time.NewTicker(keepAliveInterval), + pingCheckTicker: time.NewTicker(keepAliveInterval), + tlsOptions: tlsOptions, + auth: auth, closeCh: make(chan interface{}), incomingRequestsCh: make(chan *request, 10), @@ -248,14 +250,16 @@ func (c *connection) doHandshake() bool { // During the initial handshake, the internal keep alive is not // active yet, so we need to timeout write and read requests c.cnx.SetDeadline(time.Now().Add(keepAliveInterval)) - - c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, &pb.CommandConnect{ + cmdConnect := &pb.CommandConnect{ ProtocolVersion: &version, ClientVersion: proto.String("Pulsar Go 0.1"), AuthMethodName: proto.String(c.auth.Name()), AuthData: authData, - })) - + } + if c.connectingThroughProxy { + cmdConnect.ProxyToBrokerUrl = proto.String(c.logicalAddr.Host) + } + c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, cmdConnect)) cmd, _, err := c.reader.readSingleCommand() if err != nil { c.log.WithError(err).Warn("Failed to perform initial handshake") diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 14f775364e23e..efb5cf117b1dc 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -18,6 +18,7 @@ package internal import ( + "fmt" "net/url" "sync" "time" @@ -30,7 +31,7 @@ import ( // ConnectionPool is a interface of connection pool. type ConnectionPool interface { // GetConnection get a connection from ConnectionPool. - GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) + GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, connectingThroughProxy bool) (Connection, error) // Close all the connections in the pool Close() @@ -52,8 +53,9 @@ func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, connectionTim } } -func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) { - cachedCnx, found := p.pool.Load(logicalAddr.Host) +func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, + connectingThroughProxy bool) (Connection, error) { + cachedCnx, found := p.pool.Load(fmt.Sprintf("%s:%v", logicalAddr.Host, connectingThroughProxy)) if found { cnx := cachedCnx.(*connection) log.Debug("Found connection in cache:", cnx.logicalAddr, cnx.physicalAddr) @@ -68,8 +70,8 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U } // Try to create a new connection - newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host, - newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth)) + newCnx, wasCached := p.pool.LoadOrStore(fmt.Sprintf("%s:%v", logicalAddr.Host, connectingThroughProxy), + newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth, connectingThroughProxy)) cnx := newCnx.(*connection) if !wasCached { cnx.start() diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go index ee6dffe609ad7..e056ea5c00da9 100644 --- a/pulsar/internal/lookup_service.go +++ b/pulsar/internal/lookup_service.go @@ -30,8 +30,9 @@ import ( // LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr. type LookupResult struct { - LogicalAddr *url.URL - PhysicalAddr *url.URL + LogicalAddr *url.URL + PhysicalAddr *url.URL + ConnectingThroughProxy bool } // LookupService is a interface of lookup service. @@ -105,7 +106,7 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) { RequestId: &id, Topic: &topic, Authoritative: lr.Authoritative, - }) + }, false) if err != nil { return nil, err } @@ -123,8 +124,9 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) { } return &LookupResult{ - LogicalAddr: logicalAddress, - PhysicalAddr: physicalAddress, + LogicalAddr: logicalAddress, + PhysicalAddr: physicalAddress, + ConnectingThroughProxy: lr.GetProxyThroughServiceUrl(), }, nil case pb.CommandLookupTopicResponse_Failed: diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go index 5bb17242cf325..cce619808c541 100644 --- a/pulsar/internal/lookup_service_test.go +++ b/pulsar/internal/lookup_service_test.go @@ -70,7 +70,7 @@ func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCo } func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, - cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { + cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error) { assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP) expectedRequest := &c.expectedRequests[0] c.expectedRequests = c.expectedRequests[1:] diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 23c3b61917878..3b5e6227bc9bc 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -45,7 +45,7 @@ type RPCClient interface { RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, - cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) + cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) @@ -71,13 +71,13 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, requestTimeout time. func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { - return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message) + return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message, false) } func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, - cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { + cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error) { // TODO: Add retry logic in case of connection issues - cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr) + cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr, connectingThroughProxy) if err != nil { return nil, err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index f96a706ab21a7..09635702119a6 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -140,8 +140,8 @@ func (p *partitionProducer) grabCnx() error { if len(p.options.Properties) > 0 { cmdProducer.Metadata = toKeyValues(p.options.Properties) } - - res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) + res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer, + lr.ConnectingThroughProxy) if err != nil { p.log.WithError(err).Error("Failed to create producer") return err