Skip to content

Commit

Permalink
Merge pull request #7286 from drew2a/fix/6131
Browse files Browse the repository at this point in the history
`Torrent Checker` refactoring
  • Loading branch information
kozlovsky authored Feb 20, 2023
2 parents cf5d52b + 2bfc9da commit 3e5f265
Show file tree
Hide file tree
Showing 35 changed files with 817 additions and 784 deletions.
40 changes: 22 additions & 18 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,43 @@
import logging
import os
import sys
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logging.info('Start to execute conf.py')

# pylint: disable=wrong-import-position
# root_dir = Path(__file__).parent.parent

root_dir = os.path.abspath(os.path.join(os.path.dirname(__name__), '..'))
tribler_components = [
tribler_source_dirs = [
os.path.join(root_dir, "src"),
os.path.join(root_dir, "doc"),
]
for component in tribler_components:
logging.info(f'Add component: {component}')

sys.path.append(str(component))
for source_dir in tribler_source_dirs:
logging.info(f'Add source dir: {source_dir}')
sys.path.append(str(source_dir))

# pylint: disable=wrong-import-position
from tribler.core.utilities.patch_import import patch_import
from tribler.core.utilities.dependencies import Scope, get_dependencies

# patch extra imports that can not be extracted from the `requirements-core.txt` file
with patch_import(modules={'libtorrent', 'validate', 'file_read_backwards'}):
from tribler.core.utilities.dependencies import Scope, get_dependencies
# extra modules that can't be extracted from the `requirements-core.txt` file
modules = [
'validate', # automatically installed alongside with configobj 5.x, should be fixed in configobj 6.0
]
modules.extend(set(get_dependencies(scope=Scope.core)))

# patch imports that can be extracted from the `requirements-core.txt` file
with patch_import(modules=set(get_dependencies(scope=Scope.core))):
from tribler.core.components.restapi.rest.root_endpoint import RootEndpoint
with patch_import(modules):
from tribler.core.components.restapi.rest.root_endpoint import RootEndpoint

add_endpoint = RootEndpoint.add_endpoint
RootEndpoint.add_endpoint = lambda self, path, ep: add_endpoint(self, path, ep) \
if path not in ['/ipv8', '/market', '/wallets'] else None
add_endpoint = RootEndpoint.add_endpoint
RootEndpoint.add_endpoint = lambda self, path, ep: add_endpoint(self, path, ep) \
if path not in ['/ipv8', '/market', '/wallets'] else None

# Extract Swagger docs
from extract_swagger import extract_swagger
# Extract Swagger docs
from extract_swagger import extract_swagger

asyncio.run(extract_swagger('restapi/swagger.yaml'))
asyncio.run(extract_swagger('restapi/swagger.yaml'))

# -- General configuration ------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions requirements-core.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ pyipv8==2.10.0
libtorrent==1.2.15
file-read-backwards==2.0.0
Brotli==1.0.9 # to prevent AttributeError on macOs: module 'brotli' has no attribute 'error' (in urllib3.response)
human-readable==1.3.2
13 changes: 11 additions & 2 deletions src/run_tribler_headless.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This script enables you to start Tribler headless.
"""
import argparse
import logging
import os
import re
import signal
Expand All @@ -12,8 +13,9 @@
from socket import inet_aton
from typing import Optional

from tribler.core.config.tribler_config import TriblerConfig
from tribler.core.components.session import Session
from tribler.core.config.tribler_config import TriblerConfig
from tribler.core.start_core import components_gen
from tribler.core.utilities.osutils import get_appstate_dir, get_root_state_directory
from tribler.core.utilities.path_util import Path
from tribler.core.utilities.process_manager import ProcessKind, ProcessManager, TriblerProcess, \
Expand Down Expand Up @@ -107,7 +109,7 @@ async def signal_handler(sig):
config.chant.testnet = True
config.bandwidth_accounting.testnet = True

self.session = Session(config)
self.session = Session(config, components=list(components_gen(config)))
try:
await self.session.start_components()
except Exception as e:
Expand All @@ -117,6 +119,11 @@ async def signal_handler(sig):
print("Tribler started")


def setup_logger(verbosity):
logging_level = logging.DEBUG if verbosity else logging.INFO
logging.basicConfig(level=logging_level)


def main(argv):
parser = argparse.ArgumentParser(add_help=False, description=('Tribler script, starts Tribler as a service'))
parser.add_argument('--help', '-h', action='help', default=argparse.SUPPRESS,
Expand All @@ -129,8 +136,10 @@ def main(argv):
help='Force the usage of specific IPv8 bootstrap server (ip:port)', action=IPPortAction)

parser.add_argument('--testnet', '-t', action='store_const', default=False, const=True, help='Join the testnet')
parser.add_argument('-v', '--verbosity', help='increase output verbosity', action='store_true')

args = parser.parse_args(sys.argv[1:])
setup_logger(args.verbosity)
service = TriblerService()

loop = get_event_loop()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import math
import time
from asyncio import Future
from typing import Awaitable

from ipv8.taskmanager import TaskManager

from tribler.core.components.libtorrent.utils.libtorrent_helper import libtorrent as lt
from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo
from tribler.core.utilities.unicode import hexlify


Expand All @@ -18,18 +21,17 @@ def __init__(self, lt_session):
:param lt_session: The session used to perform health lookups.
"""
TaskManager.__init__(self)
self.lookup_futures = {} # Map from binary infohash to future
self.bf_seeders = {} # Map from infohash to (final) seeders bloomfilter
self.bf_peers = {} # Map from infohash to (final) peers bloomfilter
self.outstanding = {} # Map from transaction_id to infohash
self.lookup_futures = {} # Map from binary infohash to future
self.bf_seeders = {} # Map from infohash to (final) seeders bloomfilter
self.bf_peers = {} # Map from infohash to (final) peers bloomfilter
self.outstanding = {} # Map from transaction_id to infohash
self.lt_session = lt_session

def get_health(self, infohash, timeout=15):
def get_health(self, infohash, timeout=15) -> Awaitable[HealthInfo]:
"""
Lookup the health of a given infohash.
:param infohash: The 20-byte infohash to lookup.
:param timeout: The timeout of the lookup.
:return: A Future that fires with a tuple, indicating the number of seeders and peers respectively.
"""
if infohash in self.lookup_futures:
return self.lookup_futures[infohash]
Expand Down Expand Up @@ -63,13 +65,8 @@ def finalize_lookup(self, infohash):
seeders = DHTHealthManager.get_size_from_bloomfilter(bf_seeders)
peers = DHTHealthManager.get_size_from_bloomfilter(bf_peers)
if not self.lookup_futures[infohash].done():
self.lookup_futures[infohash].set_result({
"DHT": [{
"infohash": hexlify(infohash),
"seeders": seeders,
"leechers": peers
}]
})
health = HealthInfo(infohash, last_check=int(time.time()), seeders=seeders, leechers=peers)
self.lookup_futures[infohash].set_result(health)

self.lookup_futures.pop(infohash, None)

Expand All @@ -94,6 +91,7 @@ def get_size_from_bloomfilter(bf):
:param bf: The bloom filter of which we estimate the size.
:return: A rounded integer, approximating the number of items in the filter.
"""

def tobits(s):
result = []
for c in s:
Expand Down Expand Up @@ -129,7 +127,7 @@ def requesting_bloomfilters(self, transaction_id, infohash):
# Libtorrent is reusing the transaction_id, and is now using it for a infohash that we're not interested in.
self.outstanding.pop(transaction_id, None)

def received_bloomfilters(self, transaction_id, bf_seeds=bytearray(256), bf_peers=bytearray(256)):
def received_bloomfilters(self, transaction_id, bf_seeds=bytearray(256), bf_peers=bytearray(256)):
"""
We have received bloom filters from the libtorrent DHT. Register the bloom filters and process them.
:param transaction_id: The ID of the query for which we are receiving the bloom filter.
Expand Down
26 changes: 13 additions & 13 deletions src/tribler/core/components/libtorrent/download_manager/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self,
self.tdef = tdef
self.handle: Optional[lt.torrent_handle] = None
self.state_dir = state_dir
self.dlmgr = download_manager
self.download_manager = download_manager
self.download_defaults = download_defaults or DownloadDefaultsSettings()
self.notifier = notifier

Expand Down Expand Up @@ -127,7 +127,7 @@ def register_alert_handler(self, alert_type: str, handler: lt.torrent_handle):
self.alert_handlers[alert_type].append(handler)

def wait_for_alert(self, success_type: str, success_getter: Optional[Callable[[Any], Any]] = None,
fail_type: str = None, fail_getter: Optional[Callable[[Any], Any]] = None):
fail_type: str = None, fail_getter: Optional[Callable[[Any], Any]] = None) -> Future:
future = Future()
if success_type:
self.futures[success_type].append((future, future.set_result, success_getter))
Expand Down Expand Up @@ -213,7 +213,7 @@ def on_add_torrent_alert(self, alert: lt.add_torrent_alert):
self.pause_after_next_hashcheck = user_stopped

# Limit the amount of connections if we have specified that
self.handle.set_max_connections(self.dlmgr.config.max_connections_download)
self.handle.set_max_connections(self.download_manager.config.max_connections_download)

# By default don't apply the IP filter
self.apply_ip_filter(False)
Expand Down Expand Up @@ -297,7 +297,7 @@ def on_save_resume_data_alert(self, alert: lt.save_resume_data_alert):

# Save it to file
basename = hexlify(resume_data[b'info-hash']) + '.conf'
filename = self.dlmgr.get_checkpoint_dir() / basename
filename = self.download_manager.get_checkpoint_dir() / basename
self.config.config['download_defaults']['name'] = self.tdef.get_name_as_unicode() # store name (for debugging)
try:
self.config.write(str(filename))
Expand Down Expand Up @@ -381,25 +381,25 @@ def on_metadata_received_alert(self, alert: lt.metadata_received_alert):
def on_performance_alert(self, alert: lt.performance_alert):
self._logger.info(f'On performance alert: {alert}')

if self.get_anon_mode() or self.dlmgr.ltsessions is None:
if self.get_anon_mode() or self.download_manager.ltsessions is None:
return

# When the send buffer watermark is too low, double the buffer size to a
# maximum of 50MiB. This is the same mechanism as Deluge uses.
lt_session = self.dlmgr.get_session(self.config.get_hops())
settings = self.dlmgr.get_session_settings(lt_session)
lt_session = self.download_manager.get_session(self.config.get_hops())
settings = self.download_manager.get_session_settings(lt_session)
if alert.message().endswith("send buffer watermark too low (upload rate will suffer)"):
if settings['send_buffer_watermark'] <= 26214400:
self._logger.info("Setting send_buffer_watermark to %s", 2 * settings['send_buffer_watermark'])
settings['send_buffer_watermark'] *= 2
self.dlmgr.set_session_settings(self.dlmgr.get_session(), settings)
self.download_manager.set_session_settings(self.download_manager.get_session(), settings)
# When the write cache is too small, double the buffer size to a maximum
# of 64MiB. Again, this is the same mechanism as Deluge uses.
elif alert.message().endswith("max outstanding disk writes reached"):
if settings['max_queued_disk_bytes'] <= 33554432:
self._logger.info("Setting max_queued_disk_bytes to %s", 2 * settings['max_queued_disk_bytes'])
settings['max_queued_disk_bytes'] *= 2
self.dlmgr.set_session_settings(self.dlmgr.get_session(), settings)
self.download_manager.set_session_settings(self.download_manager.get_session(), settings)

def on_torrent_removed_alert(self, alert: lt.torrent_removed_alert):
self._logger.info(f'On torrent remove alert: {alert}')
Expand Down Expand Up @@ -614,7 +614,7 @@ def get_tracker_status(self):
if info.source & info.pex:
pex_peers += 1

ltsession = self.dlmgr.get_session(self.config.get_hops())
ltsession = self.download_manager.get_session(self.config.get_hops())
public = self.tdef and not self.tdef.is_private()

result = self.tracker_status.copy()
Expand All @@ -626,10 +626,10 @@ def set_state_callback(self, usercallback):
async def state_callback_loop():
if usercallback:
when = 1
while when and not self.future_removed.done() and not self.dlmgr._shutdown:
while when and not self.future_removed.done() and not self.download_manager._shutdown:
result = usercallback(self.get_state())
when = (await result) if iscoroutine(result) else result
if when > 0.0 and not self.dlmgr._shutdown:
if when > 0.0 and not self.download_manager._shutdown:
await sleep(when)

return self.register_anonymous_task("downloads_cb", state_callback_loop)
Expand Down Expand Up @@ -684,7 +684,7 @@ def checkpoint(self):
# Libtorrent hasn't received or initialized this download yet
# 1. Check if we have data for this infohash already (don't overwrite it if we do!)
basename = hexlify(self.tdef.get_infohash()) + '.conf'
filename = Path(self.dlmgr.get_checkpoint_dir() / basename)
filename = Path(self.download_manager.get_checkpoint_dir() / basename)
if not filename.is_file():
# 2. If there is no saved data for this infohash, checkpoint it without data so we do not
# lose it when we crash or restart before the download becomes known.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from binascii import unhexlify
from copy import deepcopy
from shutil import rmtree
from typing import List, Optional
from typing import Dict, List, Optional

from ipv8.taskmanager import TaskManager, task

Expand Down Expand Up @@ -48,7 +48,8 @@
DEFAULT_DHT_ROUTERS = [
("dht.libtorrent.org", 25401),
("router.bittorrent.com", 6881),
("router.utorrent.com", 6881)
("router.utorrent.com", 6881),
("dht.transmissionbt.com", 6881),
]
DEFAULT_LT_EXTENSIONS = [
lt.create_ut_metadata_plugin,
Expand Down Expand Up @@ -86,7 +87,7 @@ def __init__(self,
self.state_dir = state_dir
self.ltsettings = {} # Stores a copy of the settings dict for each libtorrent session
self.ltsessions = {}
self.dht_health_manager = None
self.dht_health_manager: Optional[DHTHealthManager] = None
self.listen_ports = {}

self.socks_listen_ports = socks_listen_ports
Expand Down Expand Up @@ -461,7 +462,8 @@ def update_ip_filter(self, lt_session, ip_addresses):
ip_filter.add_rule(ip, ip, 0)
lt_session.set_ip_filter(ip_filter)

async def get_metainfo(self, infohash, timeout=30, hops=None, url=None):
async def get_metainfo(self, infohash: bytes, timeout: float = 30, hops: Optional[int] = None,
url: Optional[str] = None, raise_errors: bool = False) -> Optional[Dict]:
"""
Lookup metainfo for a given infohash. The mechanism works by joining the swarm for the infohash connecting
to a few peers, and downloading the metadata for the torrent.
Expand Down Expand Up @@ -490,17 +492,24 @@ async def get_metainfo(self, infohash, timeout=30, hops=None, url=None):
dcfg.set_dest_dir(self.metadata_tmpdir)
try:
download = self.start_download(tdef=tdef, config=dcfg, hidden=True, checkpoint_disabled=True)
except TypeError:
return
except TypeError as e:
self._logger.warning(e)
if raise_errors:
raise e
return None
self.metainfo_requests[infohash] = [download, 1]

try:
metainfo = download.tdef.get_metainfo() or await wait_for(shield(download.future_metainfo), timeout)
self._logger.info('Successfully retrieved metainfo for %s', infohash_hex)
self.metainfo_cache[infohash] = {'time': timemod.time(), 'meta_info': metainfo}
except (CancelledError, asyncio.TimeoutError):
metainfo = None
except (CancelledError, asyncio.TimeoutError) as e:
self._logger.warning(f'{type(e).__name__}: {e} (timeout={timeout})')
self._logger.info('Failed to retrieve metainfo for %s', infohash_hex)
if raise_errors:
raise e
return None

self._logger.info('Successfully retrieved metainfo for %s', infohash_hex)
self.metainfo_cache[infohash] = {'time': timemod.time(), 'meta_info': metainfo}

if infohash in self.metainfo_requests:
self.metainfo_requests[infohash][1] -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,23 @@
import pytest

from tribler.core.components.libtorrent.download_manager.dht_health_manager import DHTHealthManager
from tribler.core.utilities.unicode import hexlify


# pylint: disable=redefined-outer-name

@pytest.fixture
async def dht_health_manager():
manager = DHTHealthManager(lt_session=Mock())
yield manager
await manager.shutdown_task_manager()


async def test_get_health(dht_health_manager):
async def test_get_health(dht_health_manager: DHTHealthManager):
"""
Test fetching the health of a trackerless torrent.
"""
response = await dht_health_manager.get_health(b'a' * 20, timeout=0.1)
assert isinstance(response, dict)
assert 'DHT' in response
assert response['DHT'][0]['infohash'] == hexlify(b'a' * 20)
health = await dht_health_manager.get_health(b'a' * 20, timeout=0.1)
assert health.infohash == b'a' * 20


async def test_existing_get_health(dht_health_manager):
Expand Down
Loading

0 comments on commit 3e5f265

Please sign in to comment.