Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proxy] Race condition in Pulsar Proxy that causes UnsupportedOperationExceptions in Proxy logs #13923

Closed
lhotari opened this issue Jan 24, 2022 · 10 comments · Fixed by #14078 or #19327
Closed
Assignees
Labels
area/proxy type/bug The PR fixed a bug or issue reported a bug

Comments

@lhotari
Copy link
Member

lhotari commented Jan 24, 2022

Describe the bug

It is common that UnsupportedOperationExceptions appear on the Proxy logs. This particular issue was reproduced very often when Geo-replication was configured between 2 clusters.

16:57:50.305 [pulsar-proxy-io-2-3] INFO  org.apache.pulsar.proxy.server.ProxyConnection - [/10.34.1.169:47600] New connection opened
16:57:50.329 [pulsar-proxy-io-2-3] INFO  org.apache.pulsar.proxy.server.ProxyConnection - [/10.34.1.169:47600] complete connection, init proxy handler. authenticated with token role superuser, hasProxyToBrokerUrl: false
16:57:50.331 [pulsar-proxy-io-2-3] WARN  io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.UnsupportedOperationException: null
        at org.apache.pulsar.common.protocol.PulsarDecoder.handleProducer(PulsarDecoder.java:479)
        at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:193)
        at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:193)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fi
nal]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fi
nal]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fina
l]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fi
nal]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fi
nal]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fina
l]
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1371) [io.netty-netty-handler-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1245) [io.netty-netty-handler-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1285) [io.netty-netty-handler-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) [io.netty-netty-codec-4.1.72.Final.jar:4.1.72.Final
]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) [io.netty-netty-codec-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [io.netty-netty-codec-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fi
nal]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fi
nal]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fina
l]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fi
nal]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Fi
nal]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-classes-epoll
-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-classes-epoll-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-classes-epoll-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
        at java.lang.Thread.run(Thread.java:829) [?:?]

To Reproduce
Steps to reproduce the behavior:

  1. Setup 2 Pulsar clusters and enable geo-replication in a namespace that is shared across the clusters.
  2. Create a topic with 200 partitions in the replicated namespace.
  3. Restart all brokers in one cluster to get the geo-replication connections to re-initialize untill you have reproduced the issue in the Pulsar Proxy logs.

Expected behavior
The race conditions should be handled in Pulsar Proxy

@lhotari lhotari added type/bug The PR fixed a bug or issue reported a bug area/proxy labels Jan 24, 2022
@lhotari lhotari self-assigned this Jan 24, 2022
@lhotari
Copy link
Member Author

lhotari commented Jan 24, 2022

I'm working on a fix for this issue.

@lhotari
Copy link
Member Author

lhotari commented Jan 25, 2022

One explanation why the current solution doesn't work as expected can be found in the javadoc of https://netty.io/4.1/api/io/netty/handler/flow/FlowControlHandler.html . The Pulsar Proxy doesn't currently use FlowControlHandler and therefore multiple messages can be in the pipeline although the implementation calls read one-by-one.

lhotari added a commit to lhotari/pulsar that referenced this issue Jan 31, 2022
Fixes apache#14075
Fixes apache#13923

- Optimize the proxy connection to fail-fast if the target broker isn't active
  - This reduces the number of hanging connections when unavailable brokers aren't unnecessarily attempted to be reached.
  - Pulsar client will retry connecting after a back off timeout

- Fixes the race condition in the Pulsar Proxy when opening a connection since that
  could lead to invalid states and hanging connections

- Add connect timeout handling to proxy connection
  - default to 10000 ms which is also the default of client's connect timeout

- Add read timeout handling to incoming connection and proxied connection
  - the ping/pong keepalive messages should prevent the timeout happening,
    however it's possible that the connection is in a state where keepalives aren't happening.
    - therefore it's better to have a connection level read timeout prevent broken connections left
      hanging in the proxy
lhotari added a commit to lhotari/pulsar that referenced this issue Jan 31, 2022
Fixes apache#14075
Fixes apache#13923

- Optimize the proxy connection to fail-fast if the target broker isn't active
  - This reduces the number of hanging connections when unavailable brokers aren't unnecessarily attempted to be reached.
  - Pulsar client will retry connecting after a back off timeout

- Fixes the race condition in the Pulsar Proxy when opening a connection since that
  could lead to invalid states and hanging connections

- Add connect timeout handling to proxy connection
  - default to 10000 ms which is also the default of client's connect timeout

- Add read timeout handling to incoming connection and proxied connection
  - the ping/pong keepalive messages should prevent the timeout happening,
    however it's possible that the connection is in a state where keepalives aren't happening.
    - therefore it's better to have a connection level read timeout prevent broken connections left
      hanging in the proxy
lhotari added a commit to lhotari/pulsar that referenced this issue Jan 31, 2022
Fixes apache#14075
Fixes apache#13923

- Optimize the proxy connection to fail-fast if the target broker isn't active
  - This reduces the number of hanging connections when unavailable brokers aren't unnecessarily attempted to be reached.
  - Pulsar client will retry connecting after a back off timeout

- Fixes the race condition in the Pulsar Proxy when opening a connection since that
  could lead to invalid states and hanging connections

- Add connect timeout handling to proxy connection
  - default to 10000 ms which is also the default of client's connect timeout

- Add read timeout handling to incoming connection and proxied connection
  - the ping/pong keepalive messages should prevent the timeout happening,
    however it's possible that the connection is in a state where keepalives aren't happening.
    - therefore it's better to have a connection level read timeout prevent broken connections left
      hanging in the proxy
lhotari added a commit to lhotari/pulsar that referenced this issue Feb 4, 2022
Fixes apache#14075
Fixes apache#13923

- Optimize the proxy connection to fail-fast if the target broker isn't active
  - This reduces the number of hanging connections when unavailable brokers aren't unnecessarily attempted to be reached.
  - Pulsar client will retry connecting after a back off timeout

- Fixes the race condition in the Pulsar Proxy when opening a connection since that
  could lead to invalid states and hanging connections

- Add connect timeout handling to proxy connection
  - default to 10000 ms which is also the default of client's connect timeout

- Add read timeout handling to incoming connection and proxied connection
  - the ping/pong keepalive messages should prevent the timeout happening,
    however it's possible that the connection is in a state where keepalives aren't happening.
    - therefore it's better to have a connection level read timeout prevent broken connections left
      hanging in the proxy
lhotari added a commit to lhotari/pulsar that referenced this issue Feb 7, 2022
Fixes apache#14075
Fixes apache#13923

- Optimize the proxy connection to fail-fast if the target broker isn't active
  - This reduces the number of hanging connections when unavailable brokers aren't unnecessarily attempted to be reached.
  - Pulsar client will retry connecting after a back off timeout

- Fixes the race condition in the Pulsar Proxy when opening a connection since that
  could lead to invalid states and hanging connections

- Add connect timeout handling to proxy connection
  - default to 10000 ms which is also the default of client's connect timeout

- Add read timeout handling to incoming connection and proxied connection
  - the ping/pong keepalive messages should prevent the timeout happening,
    however it's possible that the connection is in a state where keepalives aren't happening.
    - therefore it's better to have a connection level read timeout prevent broken connections left
      hanging in the proxy
@lhotari
Copy link
Member Author

lhotari commented Jan 16, 2023

This problem remains. This log message appears in production environments and also in integration tests.

recent example in https://github.com/apache/pulsar/actions/runs/3927573395/jobs/6714826271#step:12:7081

  2023-01-16T06:43:38,958 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: tion - [/172.18.0.1:36040] complete connection, init proxy handler. authenticated with none role null, hasProxyToBrokerUrl: false
  2023-01-16T06:43:38,709+0000 [pulsar-proxy-io-2-1] WARN  io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
  java.lang.UnsupportedOperationException: null
  	at org.apache.pulsar.common.protocol.PulsarDecoder.handleSubscribe(PulsarDecoder.java:532) ~[org.apache.pulsar-pulsar-common-2.12.0-SNAPSHOT.jar:2.12.0-SNAPSHOT]
  	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:243) ~[org.apache.pulsar-pulsar-common-2.12.0-SNAPSHOT.jar:2.12.0-SNAPSHOT]
  	at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:254) ~[org.apache.pulsar-pulsar-proxy-2.12.0-SNAPSHOT.jar:2.12.0-SNAPSHOT]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:44
  2023-01-16T06:43:38,958 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: 4) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[io.netty-netty-codec-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at
  2023-01-16T06:43:38,958 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[io.netty-netty-handler-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelR
  2023-01-16T06:43:38,958 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: ead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[i
  2023-01-16T06:43:38,958 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: o.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[io.netty-netty-transport-classes-epoll-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) ~[io.netty-netty-transport-classes-epoll-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) ~[io.netty-netty-transport-classes-epoll-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final]
  	at java.lang.Thread.run(Thread.java:833) ~[?:?]
  2
  2023-01-16T06:43:38,959 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: 023-01-16T06:43:38,718+0000 [pulsar-proxy-io-2-1] WARN  org.apache.pulsar.proxy.server.ProxyConnection - [/172.18.0.1:36040] Got exception UnsupportedOperationException : Message: null State: ProxyLookupRequests
  java.lang.UnsupportedOperationException: null
  	at org.apache.pulsar.common.protocol.PulsarDecoder.handleSubscribe(PulsarDecoder.java:532) ~[org.apache.pulsar-pulsar-common-2.12.0-SNAPSHOT.jar:2.12.0-SNAPSHOT]
  	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:243) ~[org.apache.pulsar-pulsar-common-2.12.0-SNAPSHOT.jar:2.12.0-SNAPSHOT]
  	at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:254) ~[org.apache.pulsar-pulsar-proxy-2.12.0-SNAPSHOT.jar:2.12.0-SNAPSHOT]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.nett
  2023-01-16T06:43:38,959 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: y-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[io.netty-netty-codec-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.ne
  2023-01-16T06:43:38,959 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: tty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[io.netty-netty-handler-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
  2023-01-16T06:43:38,959 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java
  2023-01-16T06:43:38,959 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: :800) ~[io.netty-netty-transport-classes-epoll-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) ~[io.netty-netty-transport-classes-epoll-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) ~[io.netty-netty-transport-classes-epoll-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final]
  	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final]
  	at java.lang.Thread.run(Thread.java:833) ~[?:?]
  2023-01-16T06:43:38,729+0000 [pulsar-proxy-io-2-1] INFO  org.apache.pulsar.proxy.server.ProxyConnection - [/172.18.0.1:36040] Connection closed
  2023-01-16T06:43:38,846+0000
  2023-01-16T06:43:38,959 - INFO  - [docker-java-stream--1658439674:DockerUtils$4@383] - DOCKER.exec(pulsar-proxy-pulsar-proxy:tail -f /var/log/pulsar/proxy.log): STDOUT: [pulsar-proxy-io-2-4] INFO  org.apache.pulsar.proxy.server.ProxyConnection - [/172.18.0.1:36052] New connection opened
  2023-01-16T06:43:38,849+0000 [pulsar-proxy-io-2-4] INFO  org.apache.pulsar.proxy.server.ProxyConnection - [/172.18.0.1:36052] complete connection, init proxy handler. authenticated with none role null, hasProxyToBrokerUrl: true

@lhotari lhotari reopened this Jan 16, 2023
@lhotari
Copy link
Member Author

lhotari commented Jan 16, 2023

One interesting detail is that there's hasProxyToBrokerUrl: false when the problem occurs.

@michaeljmarshall
Copy link
Member

Is the issue how we handle the upstream broker closing the channel?

public void channelInactive(ChannelHandlerContext ctx) {
inboundChannel.close();
}

I don't yet know the lifecycle for the ProxyConnection when the upstream broker closes the connection, but it seems like we should implement these methods so that the proxy responds with a retry-able failure when it gets commands that were intended for the upstream broker.

@michaeljmarshall
Copy link
Member

@lhotari great observation about the hasProxyToBrokerUrl: false. I wonder if the underlying issue is actually in the client sending the wrong command. A subscribe protocol message should never be sent to a proxy connection for lookups.

@michaeljmarshall
Copy link
Member

michaeljmarshall commented Jan 25, 2023

Additional observations based on this test log: https://github.com/apache/pulsar/actions/runs/3927573395/jobs/6714826271#step:12:7081.

We see this log line once:

2023-01-16T06:50:07,428 - INFO - [pulsar-client-io-101-2:ClientCnx@269] - [id: 0x053a2e53, L:/127.0.0.1:52810 - R:localhost/127.0.0.1:32794] Connected through proxy to target broker at localhost:6650

We see this one 39 times (where the broker's number changes):

2023-01-16T06:43:38,845 - INFO - [pulsar-client-io-5-2:ClientCnx@269] - [id: 0x30c84405, L:/127.0.0.1:44442 - R:localhost/127.0.0.1:32784] Connected through proxy to target broker at pulsar-broker-1:6650

The connection to the proxy that fails is associated with this log line:

2023-01-16T06:43:38,686+0000 [pulsar-proxy-io-2-1] INFO org.apache.pulsar.proxy.server.ProxyConnection - [/172.18.0.1:36040] New connection opened

Intriguingly, port 36040 is never referenced as having connected to the broker. That makes sense from the proxy side, but it doesn't make sense from the client side because the client sent a Subscribe command. I don't see it yet, but this seems like a Java client bug.

@michaeljmarshall
Copy link
Member

I am pretty confident I found the root cause. If you update the ConnectionPool to use the following logic, the ProxyTest class fails with the same errors:

createConnection(physicalAddress).thenAcceptAsync(channel -> {
            log.info("[{}] Connected to server", channel);

            channel.closeFuture().addListener(v -> {
                // Remove connection from pool when it gets closed
                if (log.isDebugEnabled()) {
                    log.debug("Removing closed connection from pool: {}", v);
                }
                cleanupConnection(logicalAddress, connectionKey, cnxFuture);
            });

            // We are connected to broker, but need to wait until the connect/connected handshake is
            // complete
            final ClientCnx cnx = (ClientCnx) channel.pipeline().get("handler");
            if (!channel.isActive() || cnx == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Connection was already closed by the time we got notified", channel);
                }
                cnxFuture.completeExceptionally(new ChannelException("Connection already closed"));
                return;
            }

            if (!logicalAddress.equals(physicalAddress)) {
                // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
                // it can be specified when sending the CommandConnect.
                // That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after
                // this method.
                cnx.setTargetBroker(logicalAddress);
            }

            cnx.setRemoteHostName(physicalAddress.getHostString());

            cnx.connectionFuture().thenRun(() -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Connection handshake completed", cnx.channel());
                }
                cnxFuture.complete(cnx);
            }).exceptionally(exception -> {
                log.warn("[{}] Connection handshake failed: {}", cnx.channel(), exception.getMessage());
                cnxFuture.completeExceptionally(exception);
                cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                cnx.ctx().close();
                return null;
            });
        }, ForkJoinPool.commonPool())

The one detail I haven't found is why the callback is sometimes run late.

Note that this explanation aligns closely with the earlier observation that we didn't see the client log about connecting to the broker. The log would have come from here:

if (proxyToTargetBrokerAddress == null) {
if (log.isDebugEnabled()) {
log.debug("{} Connected to broker", ctx.channel());
}
} else {
log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
}

@michaeljmarshall
Copy link
Member

One explanation could be that it is dependent on which thread we're in, as this code suggests:

https://github.com/netty/netty/blob/22d31519bc3fe34973a59b0fb43bbd580906388f/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java#L484-L506

@lhotari
Copy link
Member Author

lhotari commented Jan 26, 2023

Awesome job on the investigation @michaeljmarshall !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/proxy type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
2 participants