Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mss.handle never resolves when optimisticSelect #2321

Closed
marcus-pousette opened this issue Dec 18, 2023 · 11 comments · Fixed by #2331
Closed

mss.handle never resolves when optimisticSelect #2321

marcus-pousette opened this issue Dec 18, 2023 · 11 comments · Fixed by #2331
Labels
need/triage Needs initial labeling and prioritization

Comments

@marcus-pousette
Copy link
Contributor

marcus-pousette commented Dec 18, 2023

  • Version:

libp2p: 1.0.10

  • Platform:

Darwin MacBook-Pro-3 23.0.0

  • Subsystem:

multistream-select (?)

Severity:

Critical/High

Description:

I am in the process of upgrading to v1 from 0.46. After upgrading I am no longer able to receive incoming streams in my protocol handler. Something that did work prior to upgrading the libp2p version.

Specifically, the registrar handle never emits any new incoming streams.

this.components.registrar.handle(multicodec, this.onIncomingStream, options) 

Going through this more, it seems that I get incoming streams, but the code never resolves at mss.handle

const { stream, protocol } = await mss.handle(muxedStream, protocols, {

I get the same results when using yamux and mplex.

These are the logs I get when both nodes attempt to open a stream to the other node at the same time

libp2p:connection:outbound:2ei1eo1702901551113 starting new stream for protocols [ '/test/0.0.0' ] +5s
  libp2p:yamux:outbound:3 no abort signal was passed while trying to negotiate protocols [ '/test/0.0.0' ] falling back to default timeout +0ms
  libp2p:yamux:outbound:3 selected protocol /test/0.0.0 +0ms
  libp2p:connection:outbound:2ei1eo1702901551113 starting new stream for protocols [ '/ipfs/id/push/1.0.0' ] +5ms
  libp2p:yamux:outbound:5 selected protocol /ipfs/id/push/1.0.0 +0ms
  libp2p:yamux frame for missing stream id=1 +0ms
  libp2p:yamux frame for missing stream id=1 +0ms
  libp2p:yamux frame for missing stream id=1 +0ms
  libp2p:yamux frame for missing stream id=1 +2ms
  libp2p:identify identify completed for peer 12D3KooWPmL3fL7d8kmW643NnM1gppd4rhvCJzHAKHKyotQaULFw and protocols [ '/ipfs/id/1.0.0', '/ipfs/id/push/1.0.0', '/libp2p/circuit/relay/0.2.0/hop', '/test/0.0.0' ] +0ms
  libp2p:identify our observed address is /ip4/127.0.0.1/tcp/60410/p2p/12D3KooWLT7iLouzF7NbAZuV2SHf528RMxZMrg2Dh66JmBkaSBK4 +0ms
  libp2p:identify storing our observed address /ip4/127.0.0.1/tcp/60410/p2p/12D3KooWLT7iLouzF7NbAZuV2SHf528RMxZMrg2Dh66JmBkaSBK4 +0ms
  libp2p:identify received identify from 12D3KooWPmL3fL7d8kmW643NnM1gppd4rhvCJzHAKHKyotQaULFw +0ms
  libp2p:identify received signedPeerRecord from 12D3KooWPmL3fL7d8kmW643NnM1gppd4rhvCJzHAKHKyotQaULFw +0ms
  libp2p:identify patching 12D3KooWPmL3fL7d8kmW643NnM1gppd4rhvCJzHAKHKyotQaULFw with {
  addresses: [
    {
      isCertified: true,
      multiaddr: Multiaddr(/ip4/127.0.0.1/tcp/60409)
    },
    {
      isCertified: true,
      multiaddr: Multiaddr(/ip4/127.0.0.1/tcp/60411/ws)
    }
  ],
  protocols: [
    '/ipfs/id/1.0.0',
    '/ipfs/id/push/1.0.0',
    '/libp2p/circuit/relay/0.2.0/hop',
    '/test/0.0.0'
  ],
  publicKey: <Buffer 08 01 12 20 cf 3c 79 9d 4d b6 76 2a 71 f5 0b 1d 43 ad 86 47 2e 6f b4 ce ee 4c 07 27 10 8d fc c3 a8 51 ed 76>,
  metadata: Map(0) {},
  peerRecordEnvelope: <Buffer 0a 24 08 01 12 20 cf 3c 79 9d 4d b6 76 2a 71 f5 0b 1d 43 ad 86 47 2e 6f b4 ce ee 4c 07 27 10 8d fc c3 a8 51 ed 76 12 02 03 01 1a 49 0a 26 00 24 08 01 ... 133 more bytes>
} +2ms
  libp2p:identify merging 12D3KooWPmL3fL7d8kmW643NnM1gppd4rhvCJzHAKHKyotQaULFw metadata {
  AgentVersion: Uint8Array(31) [
    108, 105, 98, 112,  50, 112,  47,  49,
     46,  48, 46,  49,  48,  32,  85, 115,
    101, 114, 65, 103, 101, 110, 116,  61,
    118,  49, 56,  46,  48,  46,  48
  ],
  ProtocolVersion: Uint8Array(10) [
    105, 112, 102, 115, 47,
     48,  46,  49,  46, 48
  ]
} +1ms
  libp2p:connection:inbound:cuesk11702901551127 incoming stream opened on /ipfs/id/push/1.0.0 +5s
  libp2p:connection:inbound:cuesk11702901551127 starting new stream for protocols [ '/test/0.0.0' ] +4s
  libp2p:yamux:outbound:4 no abort signal was passed while trying to negotiate protocols [ '/test/0.0.0' ] falling back to default timeout +0ms
  libp2p:yamux:outbound:4 selected protocol /test/0.0.0 +1ms
  libp2p:identify received identify from 12D3KooWPmL3fL7d8kmW643NnM1gppd4rhvCJzHAKHKyotQaULFw +4s
  libp2p:identify received signedPeerRecord from 12D3KooWPmL3fL7d8kmW643NnM1gppd4rhvCJzHAKHKyotQaULFw +0ms
  libp2p:identify patching 12D3KooWPmL3fL7d8kmW643NnM1gppd4rhvCJzHAKHKyotQaULFw with {
  addresses: [
    {
      isCertified: true,
      multiaddr: Multiaddr(/ip4/127.0.0.1/tcp/60409)
    },
    {
      isCertified: true,
      multiaddr: Multiaddr(/ip4/127.0.0.1/tcp/60411/ws)
    }
  ],
  protocols: [
    '/ipfs/id/1.0.0',
    '/ipfs/id/push/1.0.0',
    '/libp2p/circuit/relay/0.2.0/hop',
    '/test/0.0.0'
  ],
  metadata: Map(2) {
    'AgentVersion' => Uint8Array(31) [
      108, 105, 98, 112,  50, 112,  47,  49,
       46,  48, 46,  49,  48,  32,  85, 115,
      101, 114, 65, 103, 101, 110, 116,  61,
      118,  49, 56,  46,  48,  46,  48
    ],
    'ProtocolVersion' => Uint8Array(10) [
      105, 112, 102, 115, 47,
       48,  46,  49,  46, 48
    ]
  },
  peerRecordEnvelope: <Buffer 0a 24 08 01 12 20 cf 3c 79 9d 4d b6 76 2a 71 f5 0b 1d 43 ad 86 47 2e 6f b4 ce ee 4c 07 27 10 8d fc c3 a8 51 ed 76 12 02 03 01 1a 49 0a 26 00 24 08 01 ... 133 more bytes>
} +1ms
  libp2p:identify merging 12D3KooWPmL3fL7d8kmW643NnM1gppd4rhvCJzHAKHKyotQaULFw metadata {
  AgentVersion: Uint8Array(31) [
    108, 105, 98, 112,  50, 112,  47,  49,
     46,  48, 46,  49,  48,  32,  85, 115,
    101, 114, 65, 103, 101, 110, 116,  61,
    118,  49, 56,  46,  48,  46,  48
  ],
  ProtocolVersion: Uint8Array(10) [
    105, 112, 102, 115, 47,
     48,  46,  49,  46, 48
  ]
} +1ms
  libp2p:identify handled push from 12D3KooWPmL3fL7d8kmW643NnM1gppd4rhvCJzHAKHKyotQaULFw +1ms
  libp2p:yamux frame for missing stream id=2 +0ms
  libp2p:yamux frame for missing stream id=2 +0ms
  libp2p:yamux frame for missing stream id=2 +0ms
  libp2p:yamux frame for missing stream id=2 +1ms
  libp2p:yamux frame for missing stream id=5 +0ms
  libp2p:yamux discarding data for stream id=5 +0ms
  libp2p:yamux discarding data for stream id=5 +0ms
  libp2p:yamux frame for missing stream id=5 +1ms
  

I wonder whether #2318 is intended to fix this issue. Or some other work related to 'optimistic' stream selection is related to this.

@marcus-pousette marcus-pousette added the need/triage Needs initial labeling and prioritization label Dec 18, 2023
@marcus-pousette
Copy link
Contributor Author

marcus-pousette commented Dec 18, 2023

Removing this line

return optimisticSelect(stream, protocols[0], options)

makes everything work again. So it feels that there is some funky concurrency issue perhaps where protocols.length === 1 condition is not sufficient to be able to use "optimisticSelect", or there are some sideeffects of using that method that affects next-coming protocol selection

@marcus-pousette marcus-pousette changed the title mss.handle never resolves for incomingStream mss.handle never resolves when optimisticSelect Dec 18, 2023
@achingbrain
Copy link
Member

#2318 resolves a problem where for very short lived streams, where we send a small amount of data and then immediately close the stream, we'd end up closing the stream before negotiation had finished.

Is this similar to what you are doing? Or something else? If it's something else can you link me to an example?

@marcus-pousette
Copy link
Contributor Author

#2318 resolves a problem where for very short lived streams, where we send a small amount of data and then immediately close the stream, we'd end up closing the stream before negotiation had finished.

Is this similar to what you are doing? Or something else? If it's something else can you link me to an example?

I did try to patch my lib with those changes but where not successful in eliminating the problem. Only by remove the optimisticSelect has fixed it so far for me.

I am doing a similiar thing as to how gossipsub handles and setups new streams, so I am curious how no other have got this problem yet (but it seems that gossipsub project only tests their protocol on a mocked client).

Here https://github.com/dao-xyz/peerbit/blob/2bb0734e764c58c8c0988f70c76e3786f75a05d0/packages/transport/stream/src/index.ts#L538

usually I receive incoming streams. But no I no longer receive them because mss.handle never resolves so the registrar.handle thing events are never emitted.

Things that might differ from my setup to others: I am using the circuit relay v2 and identify protocol. And both sides tries to open a stream to the other side at the same time once they have successfully established a connection (on topology changes

@marcus-pousette
Copy link
Contributor Author

I also tried added a 5s delay before opening a stream on a new connection to see whether the issues where related to other protocols needing to go through the mss.handle but no success

@marcus-pousette
Copy link
Contributor Author

marcus-pousette commented Dec 19, 2023

Here

const originalSink = stream.sink.bind(stream)

I enter with

/noise
/mplex/6.7.0
/ipfs/id/1.0.0
/test/0.0.0 <-- my protocol handler
/ipfs/id/push/1.0.0

But I only seem to send protocol for
noise
/mplex/6.7.0
/ipfs/id/push/1.0.0
ipfs/id/1.0.0

Interestingly at the exact moment my test fail (i.e. it starts to close the outbound stream of my protocol handler (?)) then it starts to send /test/0.0.0 because I reach here but fails with the error
"Cannot push value onto an ended pushable" (which makes sense since the stream is to end)

It is like that "/test/0.0.0" gets stuck at

for await (const buf of source) {
somehow

I am testing now with the newest release (1.0.11)

@marcus-pousette
Copy link
Contributor Author

marcus-pousette commented Dec 19, 2023

I wonder if the race condition here is that both sides expect the other side to write some data, so no-one starts to negotiate before its too late?

@marcus-pousette
Copy link
Contributor Author

marcus-pousette commented Dec 20, 2023

I think I have figured out the issue now:

Previously a stream (inbound and outbound) could be setup without any data to be written, this allowed one to dial and wait for this to happen without engaging in any activity. This is something I did rely on at various places, since I assumed previously a half-opened stream would be considered, not ready, unwanted or a sign that something has gone wrong.

The case now is that i have to write data in order to setup a stream fully (read and write). I.e my apps has to trust half-open streams to be 'ok' even though perhaps something can go wrong down the line

I need to understand though if I treat a "stream is 'ready'" requirement only to be that the outbound direction only exist, will not create any problems further down the line. Like that if the inbound stream is never successfully setup, I need a way of treating the whole stream as 'not ready'. This kind of creates a scenario where I need to write some code to correctly handle the case where a half-open stream never succesfully is able to convert fully open stream, and terminate the stream altogether.

@achingbrain
Copy link
Member

This sounds like it's getting closer. From the recent performance work it became apparent that round trips are the thing that make opening connections slow.

When there's only a single protocol being negotiated, in the case of open stream & write data, we optimistically write the protocol and the first chunk of data into the transport together, saving a round trip of sending the protocol, reading the response, then sending the data - this is what lowers connection establisment time to around 200ms. Of course it needs the first data chunk to do this, so negotiation doesn't complete until the stream is interacted with.

In the case of open stream & read data it falls back to send the protocol, read the response, read some data, but only when the caller tries to read from the stream - so again negotiation doesn't complete until the stream is interacted with.

When there are multiple protocols being negotiated it has the older behaviour of immediately writing/reading protocol names/responses until one is agreed upon, since we can't send protocol+data, then send another protocol because the remote will interpret the data as the next protocol to be negotiated.

The intention here is that you open a stream, try to write or read data and deal with any failures as they occur.

If the case is that you need to open streams but do nothing with them, we might need to add an option to allow ensuring they are negotiated fully immediately to support this.

achingbrain added a commit that referenced this issue Dec 20, 2023
Adds an option to `libp2p.dialProtocol` and `connection.newStream` to
disable optimisitic protocol selection and default to waiting for
negotation to finish before sending data.

This increases the time-to-first-byte of a stream negotiated with a
single protocol but restores the previous behaviour.

Fixes #2321
@marcus-pousette
Copy link
Contributor Author

Make sense!

If the case is that you need to open streams but do nothing with them, we might need to add an option to allow ensuring they are negotiated fully immediately to support this.

I wonder whether there is a scenario where not doing optimistic protocol selection is faster. Imagine a case where you dial a bunch of peers then, eventually you want to send data to them. In this case, there might be idle time before data is sent, and that time could be used to negotiate the protocols.. If you in this case do protocol selection at the time the first data is sent, the delivery of the first package will be slower than if the negotiation was done prior

I imagine this (pseudo code) to solve both scenarios

if (protocols length === 1 )
{
    optimisticProtocolSelection()
   setTimeout(()=>{
       normalSelection()
   },100)
}

@marcus-pousette
Copy link
Contributor Author

marcus-pousette commented Dec 20, 2023

Another thing,

this line

Seem to always make my code throw/log the error "Cannot push value onto an ended pushable" if have a created a stream but never used it and is terminating it. I expect that there would be no errors thrown if I terminate a stream without sending any data through it.

What is the purpose of negotiating the protocols that is "ending"?

@achingbrain
Copy link
Member

It's possible that we don't need that line now since #2318 has made closing streams in mid-negotiation a bit more predictable.

achingbrain added a commit that referenced this issue Dec 28, 2023
Adds an option to `libp2p.dialProtocol` and `connection.newStream` to enable optimistic protocol selection but defaults to waiting for negotiation to finish before sending data.

This increases the time-to-first-byte of a stream negotiated with a single protocol but restores the previous behaviour.

Fixes #2321

---------

Co-authored-by: Chad Nehemiah <chad.nehemiah94@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
need/triage Needs initial labeling and prioritization
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants