Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

The abortable source hangs in the sink #120

Closed
filoozom opened this issue Apr 18, 2021 · 8 comments
Closed

The abortable source hangs in the sink #120

filoozom opened this issue Apr 18, 2021 · 8 comments

Comments

@filoozom
Copy link

Hi,

I need to be able to close a write stream independently of the read one for a project that communicates with libp2p nodes written in Golang. I tried out #115, but I don't think that it works, as it seems like it's just creating a new stream and closing that one instead of the current one.

I thus wanted to replace the closeWrite function by a third AbortController, so that I could just skip the generator's catch and send the Types.CLOSE message.

However, while doing this I noticed that I never reach the catch with AbortErrors, because it hangs on the following line in abortable-iterator: https://github.com/alanshaw/abortable-iterator/blob/master/index.js#L60. So neither abortController nor resetController controller works for me, and the third one I wanted to add doesn't either.

I'm not familiar with async generators enough to know if there's a simple solution to this issue, I just know that commenting out the https://github.com/alanshaw/abortable-iterator/blob/master/index.js#L60 line makes it work. Would you have any suggestions regarding this issue?

@vasco-santos
Copy link
Member

Hello @filoozom

Thanks for reaching out.

as it seems like it's just creating a new stream and closing that one instead of the current one.

I am not entirely sure I understand you here.

Once a libp2p connection exists, streams can be opened through that connection using a stream multiplexer. When a libp2p stream opens, multiplexer.newStream() is called. This will use the given stream multiplexer algorithm to multiplex several streams within the same raw connection.

A connection can have long lived streams, or short lived streams, which should also be partially closed (using closeRead + closeWrite). Once we support these features, users are able to individually close a given part of the async iterator flow.


I Updated #115 to #121 as we are planning on getting that worked finished soon. But, I want to make sure we can move forward with a solution that can be used for all the use cases.

Can you provide us a simple code snippet of how you are expecting to interact with this (from libp2p), as well as a test scenario with the issue?

@filoozom
Copy link
Author

filoozom commented Apr 19, 2021

Hello @filoozom

Thanks for reaching out.

as it seems like it's just creating a new stream and closing that one instead of the current one.

I am not entirely sure I understand you here.

I created a simple test case here: https://github.com/filoozom/js-libp2p-mplex-pr-115. The issue might come from my writeStream and readStream functions, I'm not sure, I don't have a lot of experience with iterable streams.

With DEBUG=* node listener.js, we can see:

  libp2p:mplex incoming message { id: 1, type: 'MESSAGE_INITIATOR', data: <Buffer 03 73 79 6e> } +44ms
  libp2p:mplex receiver stream 1 1 send { id: 1, type: 'CLOSE_RECEIVER', data: undefined } +1ms
  libp2p:mplex:stream receiver stream 1 sink end undefined +45ms
  libp2p:mplex receiver stream 1 1 send {
  id: 1,
  type: 'MESSAGE_RECEIVER',
  data: <Buffer 06 73 79 6e 61 63 6b>
} +0ms
  libp2p:mplex incoming message { id: 1, type: 'NEW_STREAM', data: <Buffer 31> } +3ms
  libp2p:mplex error in sink Error: receiver stream 1 already exists!
    at Mplex._newStream (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:117:13)
    at Mplex._newReceiverStream (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:101:17)
    at Mplex._handleIncoming (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:211:27)
    at /home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:160:22
    at processTicksAndRejections (internal/process/task_queues.js:93:5)
    at async Mplex.sink (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:153:9) +0ms
  libp2p:mplex:stream receiver stream 1 abort Error: receiver stream 1 already exists!
    at Mplex._newStream (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:117:13)
    at Mplex._newReceiverStream (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:101:17)
    at Mplex._handleIncoming (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:211:27)
    at /home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:160:22
    at processTicksAndRejections (internal/process/task_queues.js:93:5)
    at async Mplex.sink (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:153:9) +3ms
  libp2p:mplex:stream receiver stream 1 source end Error: receiver stream 1 already exists!
    at Mplex._newStream (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:117:13)
    at Mplex._newReceiverStream (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:101:17)
    at Mplex._handleIncoming (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:211:27)
    at /home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:160:22
    at processTicksAndRejections (internal/process/task_queues.js:93:5)
    at async Mplex.sink (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:153:9) +0ms
  libp2p:mplex receiver stream 1 1 ended +1ms
  libp2p:tcp:socket Error: receiver stream 1 already exists!
  libp2p:tcp:socket     at Mplex._newStream (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:117:13)
  libp2p:tcp:socket     at Mplex._newReceiverStream (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:101:17)
  libp2p:tcp:socket     at Mplex._handleIncoming (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:211:27)
  libp2p:tcp:socket     at /home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:160:22
  libp2p:tcp:socket     at processTicksAndRejections (internal/process/task_queues.js:93:5)
  libp2p:tcp:socket     at async Mplex.sink (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:153:9) +0ms
(node:2842) UnhandledPromiseRejectionWarning: Error: receiver stream 1 already exists!
    at Mplex._newStream (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:117:13)
    at Mplex._newReceiverStream (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:101:17)
    at Mplex._handleIncoming (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:211:27)
    at /home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:160:22
    at processTicksAndRejections (internal/process/task_queues.js:93:5)
    at async Mplex.sink (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:153:9)
(Use `node --trace-warnings ...` to show where the warning was created)
(node:2842) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 2)
(node:2842) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

and with DEBUG=* node listener.js:

  mss:select select: read "/close/1.0.0" +1ms
  libp2p:mplex initiator stream 1 1 send { id: 1, type: 'MESSAGE_INITIATOR', data: <Buffer 03 73 79 6e> } +2ms
  libp2p:peer-store:address-book stored provided peer record for QmcrQZ6RJdpYuGvZqD5QEHAv6qX4BrQLJLQPQUrTrzdcgm +284ms
  libp2p:peer-store:proto-book stored provided protocols for QmcrQZ6RJdpYuGvZqD5QEHAv6qX4BrQLJLQPQUrTrzdcgm +291ms
  libp2p:mplex incoming message { id: 1, type: 'CLOSE_RECEIVER', data: <Buffer > } +48ms
  libp2p:mplex:stream initiator stream 1 source end undefined +50ms
  libp2p:mplex initiator stream 1 1 send { id: 1, type: 'NEW_STREAM', data: '1' } +0ms
  libp2p:mplex initiator stream 1 1 send { id: 1, type: 'CLOSE_INITIATOR', data: undefined } +0ms
  libp2p:mplex:stream initiator stream 1 sink end undefined +0ms
  libp2p:mplex initiator stream 1 1 ended +1ms
{ open: 1618828936606, close: 1618828936754 }
  libp2p:mplex:stream initiator stream 1 error Error: the stream is not in the muxer registry, it may have already been closed
    at send (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/mplex.js:122:23)
    at Object.sink (/home/filoozom/projects/swarm/operators/pr-115/node_modules/libp2p-mplex/src/stream.js:86:15)
    at processTicksAndRejections (internal/process/task_queues.js:93:5) {
  code: 'ERR_STREAM_DOESNT_EXIST'
} +2ms
  libp2p:mplex incoming message {
  id: 1,
  type: 'MESSAGE_RECEIVER',
  data: <Buffer 06 73 79 6e 61 63 6b>
} +2ms
  libp2p:mplex missing stream 1 +0ms
  libp2p:mplex error in sink Error: read ECONNRESET
    at TCP.onStreamRead (internal/stream_base_commons.js:209:20) {
  errno: -104,
  code: 'ECONNRESET',
  syscall: 'read'
} +1ms
  libp2p:tcp:socket Error: read ECONNRESET
  libp2p:tcp:socket     at TCP.onStreamRead (internal/stream_base_commons.js:209:20) +0ms

As far as I can tell, the dialer sends a message on stream 1, then creates a new stream with the same ID and closes it afterwards. The listener is not happy because a stream with that ID already exists.

I'm assuming that this is because this line:

closeWrite: () => stream.sink([]),
calls the entire function again, and thus also executes send({ id, type: Types.NEW_STREAM, data: name }) a second time. This is why I wanted to implement a new version of this using a third AbortController.

Sorry if I'm off base. I also just noticed that the listener sends CLOSE_RECEIVER before NEW_STREAM, so this might be the issue? Although even when delaying closeWrite() for 100ms, the messages are sent in the right order but the new stream is still opened with the same ID.

@vasco-santos
Copy link
Member

Thanks @filoozom , will look into this today and get back to you

@filoozom
Copy link
Author

filoozom commented Apr 19, 2021

This is how I got it to work for my needs: beejeez/js-libp2p-mplex@3db21df (not sure if it's entirely spec compliant)

@vasco-santos
Copy link
Member

vasco-santos commented Apr 19, 2021

You are completely right. I can see that the problem of the #121 and previous PR. They only work correctly if the closeWrite is the first thing to happen. However, if a source is being consumed and then closeWrite is triggered a NEW_STREAM message is sent to the remote.

Looking at your proposed solution I think we only need to take care of an extra thing, which is the thing that currently works. This means, we should probably keep state on wether sink was previously called or not. So, if sink was not called, we would do closeWrite: () => stream.sink([]),, otherwise we would do the abort of the ongoing sink.

What do you think? This probably means the ABortController current implementation would work?

Edit: if so, probably the best approach would be to create a simple MuxedStream class that keeps the state of on going sink or not. Would you like to create a new PR with the changes of #121 and adding the abort controller like this? It would be good to also have a follow up version of this: libp2p/js-libp2p-interfaces#90 with a test that writes and then closesWrite

@filoozom
Copy link
Author

What do you think? This probably means the ABortController current implementation would work?

Sounds good, do you want me to open a PR, or do you want to edit #121?

The issue I'm having with abortable-iterator is that it waits for something to happen with await resume(). Basically, if an AbortController is called and I don't write anything into the sink it doesn't abort. Maybe something like:

writeCloseController.abort()
pipe([], sink)

could work, but then we'd need a reference to the created sink, or something like a PassThrough I guess, and it doesn't seem clean.

I don't know how this should be fixed. Apparently the await resume() is related to async generators, but I haven't looked into it too much yet.

@vasco-santos
Copy link
Member

Sounds good, do you want me to open a PR, or do you want to edit #121?

Open a new PR, you might not have permissions to edit mine.

The issue I'm having with abortable-iterator is that it waits for something to happen

It should be waiting for the next occurrence. We should be able to just abort, but you can create the PR, even if the test fails and I can help you debug it

achingbrain added a commit that referenced this issue May 19, 2022
This also now throws an error when a write is attempted on a non existent stream. Previously we would just send the message, but this is against the mplex protocol.

Refs: #120
Supersedes: #115
achingbrain added a commit that referenced this issue May 23, 2022
This also now throws an error when a write is attempted on a non existent stream. Previously we would just send the message, but this is against the mplex protocol.

Refs: #120
Supersedes: #115
github-actions bot pushed a commit that referenced this issue May 23, 2022
## [1.1.0](v1.0.5...v1.1.0) (2022-05-23)

### Features

* close read and write streams ([#170](#170)) ([3917968](3917968)), closes [#120](#120) [#115](#115)
@mpetrunic
Copy link
Member

resolved via #170

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants