Skip to content

Commit

Permalink
Merge branch 'async-for-pr3504' of https://github.com/moodyjon/lbry-sdk
Browse files Browse the repository at this point in the history
… into async-for-pr3504
  • Loading branch information
moodyjon committed May 16, 2022
2 parents c2dad77 + 181b166 commit c8f1aa1
Show file tree
Hide file tree
Showing 19 changed files with 490 additions and 40 deletions.
4 changes: 4 additions & 0 deletions lbry/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,10 @@ class Config(CLIConfig):
('cdn.reflector.lbry.com', 5567)
])

tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [
('tracker.lbry.com', 1337)
])

lbryum_servers = Servers("SPV wallet servers", [
('spv11.lbry.com', 50001),
('spv12.lbry.com', 50001),
Expand Down
49 changes: 49 additions & 0 deletions lbry/extras/daemon/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from lbry.torrent.torrent_manager import TorrentManager
from lbry.wallet import WalletManager
from lbry.wallet.usage_payment import WalletServerPayer
from lbry.torrent.tracker import TrackerClient

try:
from lbry.torrent.session import TorrentSession
except ImportError:
Expand All @@ -48,6 +50,7 @@
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
TRACKER_ANNOUNCER_COMPONENT = "tracker_announcer_component"
LIBTORRENT_COMPONENT = "libtorrent_component"


Expand Down Expand Up @@ -708,3 +711,49 @@ async def start(self):

async def stop(self):
self.exchange_rate_manager.stop()


class TrackerAnnouncerComponent(Component):
component_name = TRACKER_ANNOUNCER_COMPONENT
depends_on = [FILE_MANAGER_COMPONENT]

def __init__(self, component_manager):
super().__init__(component_manager)
self.file_manager = None
self.announce_task = None
self.tracker_client: typing.Optional[TrackerClient] = None

@property
def component(self):
return self.tracker_client

@property
def running(self):
return self._running and self.announce_task and not self.announce_task.done()

async def announce_forever(self):
while True:
sleep_seconds = 60.0
announce_sd_hashes = []
for file in self.file_manager.get_filtered():
if not file.downloader:
continue
announce_sd_hashes.append(bytes.fromhex(file.sd_hash))
await self.tracker_client.announce_many(*announce_sd_hashes)
await asyncio.sleep(sleep_seconds)

async def start(self):
node = self.component_manager.get_component(DHT_COMPONENT) \
if self.component_manager.has_component(DHT_COMPONENT) else None
node_id = node.protocol.node_id if node else None
self.tracker_client = TrackerClient(node_id, self.conf.tcp_port, lambda: self.conf.tracker_servers)
await self.tracker_client.start()
self.file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
self.announce_task = asyncio.create_task(self.announce_forever())

async def stop(self):
self.file_manager = None
if self.announce_task and not self.announce_task.done():
self.announce_task.cancel()
self.announce_task = None
self.tracker_client.stop()
25 changes: 16 additions & 9 deletions lbry/extras/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from lbry.extras import system_info
from lbry.extras.daemon import analytics
from lbry.extras.daemon.components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT, DISK_SPACE_COMPONENT
from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT, DISK_SPACE_COMPONENT, TRACKER_ANNOUNCER_COMPONENT
from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT
from lbry.extras.daemon.componentmanager import RequiredCondition
from lbry.extras.daemon.componentmanager import ComponentManager
Expand Down Expand Up @@ -4949,7 +4949,6 @@ async def jsonrpc_blob_delete(self, blob_hash):
DHT / Blob Exchange peer commands.
"""

@requires(DHT_COMPONENT)
async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
"""
Get peers for blob hash
Expand All @@ -4971,21 +4970,29 @@ async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
if not is_valid_blobhash(blob_hash):
# TODO: use error from lbry.error
raise Exception("invalid blob hash")
peers = []
peer_q = asyncio.Queue(loop=self.component_manager.loop)
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
if self.component_manager.has_component(TRACKER_ANNOUNCER_COMPONENT):
tracker = self.component_manager.get_component(TRACKER_ANNOUNCER_COMPONENT)
tracker_peers = await tracker.get_kademlia_peer_list(bytes.fromhex(blob_hash))
log.info("Found %d peers for %s from trackers.", len(tracker_peers), blob_hash[:8])
peer_q.put_nowait(tracker_peers)
elif not self.component_manager.has_component(DHT_COMPONENT):
raise Exception("Peer list needs, at least, either a DHT component or a Tracker component for discovery.")
peers = []
if self.component_manager.has_component(DHT_COMPONENT):
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
while not peer_q.empty():
peers.extend(peer_q.get_nowait())
results = [
{
"node_id": hexlify(peer.node_id).decode(),
results = {
(peer.address, peer.tcp_port): {
"node_id": hexlify(peer.node_id).decode() if peer.node_id else None,
"address": peer.address,
"udp_port": peer.udp_port,
"tcp_port": peer.tcp_port,
}
for peer in peers
]
return paginate_list(results, page, page_size)
}
return paginate_list(list(results.values()), page, page_size)

@requires(DATABASE_COMPONENT)
async def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None):
Expand Down
1 change: 1 addition & 0 deletions lbry/file/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: '
self.purchase_receipt = None
self._added_on = added_on
self.analytics_manager = analytics_manager
self.downloader = None

self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event(loop=self.loop)
Expand Down
3 changes: 3 additions & 0 deletions lbry/stream/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from lbry.utils import lru_cache_concurrent
from lbry.stream.descriptor import StreamDescriptor
from lbry.blob_exchange.downloader import BlobDownloader
from lbry.torrent.tracker import enqueue_tracker_search

if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.dht.node import Node
Expand Down Expand Up @@ -91,6 +93,7 @@ async def start(self, node: typing.Optional['Node'] = None, connection_id: int =
self.accumulate_task.cancel()
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
await self.add_fixed_peers()
enqueue_tracker_search(bytes.fromhex(self.sd_hash), self.peer_queue)
# start searching for peers for the sd hash
self.search_queue.put_nowait(self.sd_hash)
log.info("searching for peers for stream %s", self.sd_hash)
Expand Down
2 changes: 0 additions & 2 deletions lbry/stream/managed_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.schema.claim import Claim
from lbry.blob.blob_manager import BlobManager
from lbry.blob.blob_info import BlobInfo
from lbry.dht.node import Node
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.wallet.transaction import Transaction

Expand Down
Loading

0 comments on commit c8f1aa1

Please sign in to comment.