-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implemented PING fully-featured #409
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,13 @@ | ||
import logging | ||
import math | ||
import secrets | ||
import time | ||
from typing import Union | ||
|
||
import trio | ||
|
||
from libp2p.exceptions import ValidationError | ||
from libp2p.host.host_interface import IHost | ||
from libp2p.network.stream.exceptions import StreamClosed, StreamEOF, StreamReset | ||
from libp2p.network.stream.net_stream_interface import INetStream | ||
from libp2p.peer.id import ID as PeerID | ||
|
@@ -14,6 +20,21 @@ | |
logger = logging.getLogger("libp2p.host.ping") | ||
|
||
|
||
async def handle_ping(stream: INetStream) -> None: | ||
"""``handle_ping`` responds to incoming ping requests until one side errors | ||
or closes the ``stream``.""" | ||
peer_id = stream.muxed_conn.peer_id | ||
|
||
while True: | ||
try: | ||
should_continue = await _handle_ping(stream, peer_id) | ||
if not should_continue: | ||
return | ||
except Exception: | ||
await stream.reset() | ||
return | ||
|
||
|
||
async def _handle_ping(stream: INetStream, peer_id: PeerID) -> bool: | ||
"""Return a boolean indicating if we expect more pings from the peer at | ||
``peer_id``.""" | ||
|
@@ -45,16 +66,65 @@ async def _handle_ping(stream: INetStream, peer_id: PeerID) -> bool: | |
return True | ||
|
||
|
||
async def handle_ping(stream: INetStream) -> None: | ||
"""``handle_ping`` responds to incoming ping requests until one side errors | ||
or closes the ``stream``.""" | ||
peer_id = stream.muxed_conn.peer_id | ||
class PingService: | ||
"""PingService executes pings and returns RTT in miliseconds.""" | ||
|
||
while True: | ||
def __init__(self, host: IHost): | ||
self._host = host | ||
|
||
async def ping(self, peer_id: PeerID) -> int: | ||
stream = await self._host.new_stream(peer_id, (ID,)) | ||
try: | ||
should_continue = await _handle_ping(stream, peer_id) | ||
if not should_continue: | ||
return | ||
rtt = await _ping(stream) | ||
await _close_stream(stream) | ||
return rtt | ||
except Exception: | ||
await stream.reset() | ||
return | ||
await _close_stream(stream) | ||
raise | ||
|
||
async def ping_loop( | ||
self, peer_id: PeerID, ping_amount: Union[int, float] = math.inf | ||
) -> "PingIterator": | ||
stream = await self._host.new_stream(peer_id, (ID,)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be helpful to leave a docstring describing why we have it turns out that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, it is a good idea to write a docstring! The reason why I thought of separating into two methods is because If am open to discussion whether it really is worth it or useful or not :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also the loop is useful in case you want to add some more functionality between each ping. (or maybe it is not a good feature?) However, I have been thinking and maybe it is a good idea to add a second argument to the |
||
ping_iterator = PingIterator(stream, ping_amount) | ||
return ping_iterator | ||
|
||
|
||
class PingIterator: | ||
def __init__(self, stream: INetStream, ping_amount: Union[int, float]): | ||
self._stream = stream | ||
self._ping_limit = ping_amount | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think want to just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like it! I'll change it to |
||
self._ping_counter = 0 | ||
|
||
def __aiter__(self) -> "PingIterator": | ||
return self | ||
|
||
async def __anext__(self) -> int: | ||
if self._ping_counter > self._ping_limit: | ||
await _close_stream(self._stream) | ||
raise StopAsyncIteration | ||
|
||
self._ping_counter += 1 | ||
try: | ||
return await _ping(self._stream) | ||
except trio.EndOfChannel: | ||
await _close_stream(self._stream) | ||
raise StopAsyncIteration | ||
|
||
|
||
async def _ping(stream: INetStream) -> int: | ||
ping_bytes = secrets.token_bytes(PING_LENGTH) | ||
before = int(time.time() * 10 ** 6) # convert float of seconds to int miliseconds | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do you think about just keeping the native There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First of all thank you for your comments. I learn a lot from them! I think it is a good idea to keep it as float. Do you think it is better to leave it also as native seconds? I think I prefer to return it as a miliseconds float |
||
await stream.write(ping_bytes) | ||
pong_bytes = await stream.read(PING_LENGTH) | ||
rtt = int(time.time() * 10 ** 6) - before | ||
if ping_bytes != pong_bytes: | ||
raise ValidationError("Invalid PING response") | ||
return rtt | ||
|
||
|
||
async def _close_stream(stream: INetStream) -> None: | ||
try: | ||
await stream.close() | ||
except Exception: | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. worth at least logging this exception, if not letting bubble up to some caller There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you are right, it is better to let bubble up to the caller |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any particular reason to move this chunk of code up here?
just curious.... at first pass on this review i figured you had changed some of the logic but it seems to be the same(!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I'm sorry, I think this happened because at first I moved it inside
PingService
as a method. But at the end I left it out, and I think I moved it accidentally