diff --git a/doc/conf.py b/doc/conf.py index c997064de16..5b3dfeb3b2c 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -20,39 +20,43 @@ import logging import os import sys +from pathlib import Path logging.basicConfig(level=logging.INFO) logging.info('Start to execute conf.py') -# pylint: disable=wrong-import-position +# root_dir = Path(__file__).parent.parent + root_dir = os.path.abspath(os.path.join(os.path.dirname(__name__), '..')) -tribler_components = [ +tribler_source_dirs = [ os.path.join(root_dir, "src"), os.path.join(root_dir, "doc"), ] -for component in tribler_components: - logging.info(f'Add component: {component}') - - sys.path.append(str(component)) +for source_dir in tribler_source_dirs: + logging.info(f'Add source dir: {source_dir}') + sys.path.append(str(source_dir)) +# pylint: disable=wrong-import-position from tribler.core.utilities.patch_import import patch_import +from tribler.core.utilities.dependencies import Scope, get_dependencies -# patch extra imports that can not be extracted from the `requirements-core.txt` file -with patch_import(modules={'libtorrent', 'validate', 'file_read_backwards'}): - from tribler.core.utilities.dependencies import Scope, get_dependencies +# extra modules that can't be extracted from the `requirements-core.txt` file +modules = [ + 'validate', # automatically installed alongside with configobj 5.x, should be fixed in configobj 6.0 +] +modules.extend(set(get_dependencies(scope=Scope.core))) - # patch imports that can be extracted from the `requirements-core.txt` file - with patch_import(modules=set(get_dependencies(scope=Scope.core))): - from tribler.core.components.restapi.rest.root_endpoint import RootEndpoint +with patch_import(modules): + from tribler.core.components.restapi.rest.root_endpoint import RootEndpoint - add_endpoint = RootEndpoint.add_endpoint - RootEndpoint.add_endpoint = lambda self, path, ep: add_endpoint(self, path, ep) \ - if path not in ['/ipv8', '/market', '/wallets'] else None + add_endpoint = RootEndpoint.add_endpoint + RootEndpoint.add_endpoint = lambda self, path, ep: add_endpoint(self, path, ep) \ + if path not in ['/ipv8', '/market', '/wallets'] else None - # Extract Swagger docs - from extract_swagger import extract_swagger + # Extract Swagger docs + from extract_swagger import extract_swagger - asyncio.run(extract_swagger('restapi/swagger.yaml')) + asyncio.run(extract_swagger('restapi/swagger.yaml')) # -- General configuration ------------------------------------------------ diff --git a/requirements-core.txt b/requirements-core.txt index 48fc84087d7..e7a9e7bb011 100644 --- a/requirements-core.txt +++ b/requirements-core.txt @@ -26,3 +26,4 @@ pyipv8==2.10.0 libtorrent==1.2.15 file-read-backwards==2.0.0 Brotli==1.0.9 # to prevent AttributeError on macOs: module 'brotli' has no attribute 'error' (in urllib3.response) +human-readable==1.3.2 diff --git a/src/run_tribler_headless.py b/src/run_tribler_headless.py index f8ffc28f5ea..07fa880a8ee 100644 --- a/src/run_tribler_headless.py +++ b/src/run_tribler_headless.py @@ -2,6 +2,7 @@ This script enables you to start Tribler headless. """ import argparse +import logging import os import re import signal @@ -12,8 +13,9 @@ from socket import inet_aton from typing import Optional -from tribler.core.config.tribler_config import TriblerConfig from tribler.core.components.session import Session +from tribler.core.config.tribler_config import TriblerConfig +from tribler.core.start_core import components_gen from tribler.core.utilities.osutils import get_appstate_dir, get_root_state_directory from tribler.core.utilities.path_util import Path from tribler.core.utilities.process_manager import ProcessKind, ProcessManager, TriblerProcess, \ @@ -107,7 +109,7 @@ async def signal_handler(sig): config.chant.testnet = True config.bandwidth_accounting.testnet = True - self.session = Session(config) + self.session = Session(config, components=list(components_gen(config))) try: await self.session.start_components() except Exception as e: @@ -117,6 +119,11 @@ async def signal_handler(sig): print("Tribler started") +def setup_logger(verbosity): + logging_level = logging.DEBUG if verbosity else logging.INFO + logging.basicConfig(level=logging_level) + + def main(argv): parser = argparse.ArgumentParser(add_help=False, description=('Tribler script, starts Tribler as a service')) parser.add_argument('--help', '-h', action='help', default=argparse.SUPPRESS, @@ -129,8 +136,10 @@ def main(argv): help='Force the usage of specific IPv8 bootstrap server (ip:port)', action=IPPortAction) parser.add_argument('--testnet', '-t', action='store_const', default=False, const=True, help='Join the testnet') + parser.add_argument('-v', '--verbosity', help='increase output verbosity', action='store_true') args = parser.parse_args(sys.argv[1:]) + setup_logger(args.verbosity) service = TriblerService() loop = get_event_loop() diff --git a/src/tribler/core/components/libtorrent/download_manager/dht_health_manager.py b/src/tribler/core/components/libtorrent/download_manager/dht_health_manager.py index e299ac0f645..0e9b31a1ca2 100644 --- a/src/tribler/core/components/libtorrent/download_manager/dht_health_manager.py +++ b/src/tribler/core/components/libtorrent/download_manager/dht_health_manager.py @@ -1,9 +1,12 @@ import math +import time from asyncio import Future +from typing import Awaitable from ipv8.taskmanager import TaskManager from tribler.core.components.libtorrent.utils.libtorrent_helper import libtorrent as lt +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo from tribler.core.utilities.unicode import hexlify @@ -18,18 +21,17 @@ def __init__(self, lt_session): :param lt_session: The session used to perform health lookups. """ TaskManager.__init__(self) - self.lookup_futures = {} # Map from binary infohash to future - self.bf_seeders = {} # Map from infohash to (final) seeders bloomfilter - self.bf_peers = {} # Map from infohash to (final) peers bloomfilter - self.outstanding = {} # Map from transaction_id to infohash + self.lookup_futures = {} # Map from binary infohash to future + self.bf_seeders = {} # Map from infohash to (final) seeders bloomfilter + self.bf_peers = {} # Map from infohash to (final) peers bloomfilter + self.outstanding = {} # Map from transaction_id to infohash self.lt_session = lt_session - def get_health(self, infohash, timeout=15): + def get_health(self, infohash, timeout=15) -> Awaitable[HealthInfo]: """ Lookup the health of a given infohash. :param infohash: The 20-byte infohash to lookup. :param timeout: The timeout of the lookup. - :return: A Future that fires with a tuple, indicating the number of seeders and peers respectively. """ if infohash in self.lookup_futures: return self.lookup_futures[infohash] @@ -63,13 +65,8 @@ def finalize_lookup(self, infohash): seeders = DHTHealthManager.get_size_from_bloomfilter(bf_seeders) peers = DHTHealthManager.get_size_from_bloomfilter(bf_peers) if not self.lookup_futures[infohash].done(): - self.lookup_futures[infohash].set_result({ - "DHT": [{ - "infohash": hexlify(infohash), - "seeders": seeders, - "leechers": peers - }] - }) + health = HealthInfo(infohash, last_check=int(time.time()), seeders=seeders, leechers=peers) + self.lookup_futures[infohash].set_result(health) self.lookup_futures.pop(infohash, None) @@ -94,6 +91,7 @@ def get_size_from_bloomfilter(bf): :param bf: The bloom filter of which we estimate the size. :return: A rounded integer, approximating the number of items in the filter. """ + def tobits(s): result = [] for c in s: @@ -129,7 +127,7 @@ def requesting_bloomfilters(self, transaction_id, infohash): # Libtorrent is reusing the transaction_id, and is now using it for a infohash that we're not interested in. self.outstanding.pop(transaction_id, None) - def received_bloomfilters(self, transaction_id, bf_seeds=bytearray(256), bf_peers=bytearray(256)): + def received_bloomfilters(self, transaction_id, bf_seeds=bytearray(256), bf_peers=bytearray(256)): """ We have received bloom filters from the libtorrent DHT. Register the bloom filters and process them. :param transaction_id: The ID of the query for which we are receiving the bloom filter. diff --git a/src/tribler/core/components/libtorrent/download_manager/download.py b/src/tribler/core/components/libtorrent/download_manager/download.py index 08b251e420d..7462251610e 100644 --- a/src/tribler/core/components/libtorrent/download_manager/download.py +++ b/src/tribler/core/components/libtorrent/download_manager/download.py @@ -53,7 +53,7 @@ def __init__(self, self.tdef = tdef self.handle: Optional[lt.torrent_handle] = None self.state_dir = state_dir - self.dlmgr = download_manager + self.download_manager = download_manager self.download_defaults = download_defaults or DownloadDefaultsSettings() self.notifier = notifier @@ -127,7 +127,7 @@ def register_alert_handler(self, alert_type: str, handler: lt.torrent_handle): self.alert_handlers[alert_type].append(handler) def wait_for_alert(self, success_type: str, success_getter: Optional[Callable[[Any], Any]] = None, - fail_type: str = None, fail_getter: Optional[Callable[[Any], Any]] = None): + fail_type: str = None, fail_getter: Optional[Callable[[Any], Any]] = None) -> Future: future = Future() if success_type: self.futures[success_type].append((future, future.set_result, success_getter)) @@ -213,7 +213,7 @@ def on_add_torrent_alert(self, alert: lt.add_torrent_alert): self.pause_after_next_hashcheck = user_stopped # Limit the amount of connections if we have specified that - self.handle.set_max_connections(self.dlmgr.config.max_connections_download) + self.handle.set_max_connections(self.download_manager.config.max_connections_download) # By default don't apply the IP filter self.apply_ip_filter(False) @@ -297,7 +297,7 @@ def on_save_resume_data_alert(self, alert: lt.save_resume_data_alert): # Save it to file basename = hexlify(resume_data[b'info-hash']) + '.conf' - filename = self.dlmgr.get_checkpoint_dir() / basename + filename = self.download_manager.get_checkpoint_dir() / basename self.config.config['download_defaults']['name'] = self.tdef.get_name_as_unicode() # store name (for debugging) try: self.config.write(str(filename)) @@ -381,25 +381,25 @@ def on_metadata_received_alert(self, alert: lt.metadata_received_alert): def on_performance_alert(self, alert: lt.performance_alert): self._logger.info(f'On performance alert: {alert}') - if self.get_anon_mode() or self.dlmgr.ltsessions is None: + if self.get_anon_mode() or self.download_manager.ltsessions is None: return # When the send buffer watermark is too low, double the buffer size to a # maximum of 50MiB. This is the same mechanism as Deluge uses. - lt_session = self.dlmgr.get_session(self.config.get_hops()) - settings = self.dlmgr.get_session_settings(lt_session) + lt_session = self.download_manager.get_session(self.config.get_hops()) + settings = self.download_manager.get_session_settings(lt_session) if alert.message().endswith("send buffer watermark too low (upload rate will suffer)"): if settings['send_buffer_watermark'] <= 26214400: self._logger.info("Setting send_buffer_watermark to %s", 2 * settings['send_buffer_watermark']) settings['send_buffer_watermark'] *= 2 - self.dlmgr.set_session_settings(self.dlmgr.get_session(), settings) + self.download_manager.set_session_settings(self.download_manager.get_session(), settings) # When the write cache is too small, double the buffer size to a maximum # of 64MiB. Again, this is the same mechanism as Deluge uses. elif alert.message().endswith("max outstanding disk writes reached"): if settings['max_queued_disk_bytes'] <= 33554432: self._logger.info("Setting max_queued_disk_bytes to %s", 2 * settings['max_queued_disk_bytes']) settings['max_queued_disk_bytes'] *= 2 - self.dlmgr.set_session_settings(self.dlmgr.get_session(), settings) + self.download_manager.set_session_settings(self.download_manager.get_session(), settings) def on_torrent_removed_alert(self, alert: lt.torrent_removed_alert): self._logger.info(f'On torrent remove alert: {alert}') @@ -614,7 +614,7 @@ def get_tracker_status(self): if info.source & info.pex: pex_peers += 1 - ltsession = self.dlmgr.get_session(self.config.get_hops()) + ltsession = self.download_manager.get_session(self.config.get_hops()) public = self.tdef and not self.tdef.is_private() result = self.tracker_status.copy() @@ -626,10 +626,10 @@ def set_state_callback(self, usercallback): async def state_callback_loop(): if usercallback: when = 1 - while when and not self.future_removed.done() and not self.dlmgr._shutdown: + while when and not self.future_removed.done() and not self.download_manager._shutdown: result = usercallback(self.get_state()) when = (await result) if iscoroutine(result) else result - if when > 0.0 and not self.dlmgr._shutdown: + if when > 0.0 and not self.download_manager._shutdown: await sleep(when) return self.register_anonymous_task("downloads_cb", state_callback_loop) @@ -684,7 +684,7 @@ def checkpoint(self): # Libtorrent hasn't received or initialized this download yet # 1. Check if we have data for this infohash already (don't overwrite it if we do!) basename = hexlify(self.tdef.get_infohash()) + '.conf' - filename = Path(self.dlmgr.get_checkpoint_dir() / basename) + filename = Path(self.download_manager.get_checkpoint_dir() / basename) if not filename.is_file(): # 2. If there is no saved data for this infohash, checkpoint it without data so we do not # lose it when we crash or restart before the download becomes known. diff --git a/src/tribler/core/components/libtorrent/download_manager/download_manager.py b/src/tribler/core/components/libtorrent/download_manager/download_manager.py index 8c0a0fb33d6..7028869cf25 100644 --- a/src/tribler/core/components/libtorrent/download_manager/download_manager.py +++ b/src/tribler/core/components/libtorrent/download_manager/download_manager.py @@ -11,7 +11,7 @@ from binascii import unhexlify from copy import deepcopy from shutil import rmtree -from typing import List, Optional +from typing import Dict, List, Optional from ipv8.taskmanager import TaskManager, task @@ -48,7 +48,8 @@ DEFAULT_DHT_ROUTERS = [ ("dht.libtorrent.org", 25401), ("router.bittorrent.com", 6881), - ("router.utorrent.com", 6881) + ("router.utorrent.com", 6881), + ("dht.transmissionbt.com", 6881), ] DEFAULT_LT_EXTENSIONS = [ lt.create_ut_metadata_plugin, @@ -86,7 +87,7 @@ def __init__(self, self.state_dir = state_dir self.ltsettings = {} # Stores a copy of the settings dict for each libtorrent session self.ltsessions = {} - self.dht_health_manager = None + self.dht_health_manager: Optional[DHTHealthManager] = None self.listen_ports = {} self.socks_listen_ports = socks_listen_ports @@ -461,7 +462,8 @@ def update_ip_filter(self, lt_session, ip_addresses): ip_filter.add_rule(ip, ip, 0) lt_session.set_ip_filter(ip_filter) - async def get_metainfo(self, infohash, timeout=30, hops=None, url=None): + async def get_metainfo(self, infohash: bytes, timeout: float = 30, hops: Optional[int] = None, + url: Optional[str] = None, raise_errors: bool = False) -> Optional[Dict]: """ Lookup metainfo for a given infohash. The mechanism works by joining the swarm for the infohash connecting to a few peers, and downloading the metadata for the torrent. @@ -490,17 +492,24 @@ async def get_metainfo(self, infohash, timeout=30, hops=None, url=None): dcfg.set_dest_dir(self.metadata_tmpdir) try: download = self.start_download(tdef=tdef, config=dcfg, hidden=True, checkpoint_disabled=True) - except TypeError: - return + except TypeError as e: + self._logger.warning(e) + if raise_errors: + raise e + return None self.metainfo_requests[infohash] = [download, 1] try: metainfo = download.tdef.get_metainfo() or await wait_for(shield(download.future_metainfo), timeout) - self._logger.info('Successfully retrieved metainfo for %s', infohash_hex) - self.metainfo_cache[infohash] = {'time': timemod.time(), 'meta_info': metainfo} - except (CancelledError, asyncio.TimeoutError): - metainfo = None + except (CancelledError, asyncio.TimeoutError) as e: + self._logger.warning(f'{type(e).__name__}: {e} (timeout={timeout})') self._logger.info('Failed to retrieve metainfo for %s', infohash_hex) + if raise_errors: + raise e + return None + + self._logger.info('Successfully retrieved metainfo for %s', infohash_hex) + self.metainfo_cache[infohash] = {'time': timemod.time(), 'meta_info': metainfo} if infohash in self.metainfo_requests: self.metainfo_requests[infohash][1] -= 1 diff --git a/src/tribler/core/components/libtorrent/tests/test_dht_health_manager.py b/src/tribler/core/components/libtorrent/tests/test_dht_health_manager.py index 7c8f4406165..6d45bb94c2f 100644 --- a/src/tribler/core/components/libtorrent/tests/test_dht_health_manager.py +++ b/src/tribler/core/components/libtorrent/tests/test_dht_health_manager.py @@ -5,9 +5,10 @@ import pytest from tribler.core.components.libtorrent.download_manager.dht_health_manager import DHTHealthManager -from tribler.core.utilities.unicode import hexlify +# pylint: disable=redefined-outer-name + @pytest.fixture async def dht_health_manager(): manager = DHTHealthManager(lt_session=Mock()) @@ -15,14 +16,12 @@ async def dht_health_manager(): await manager.shutdown_task_manager() -async def test_get_health(dht_health_manager): +async def test_get_health(dht_health_manager: DHTHealthManager): """ Test fetching the health of a trackerless torrent. """ - response = await dht_health_manager.get_health(b'a' * 20, timeout=0.1) - assert isinstance(response, dict) - assert 'DHT' in response - assert response['DHT'][0]['infohash'] == hexlify(b'a' * 20) + health = await dht_health_manager.get_health(b'a' * 20, timeout=0.1) + assert health.infohash == b'a' * 20 async def test_existing_get_health(dht_health_manager): diff --git a/src/tribler/core/components/libtorrent/tests/test_download.py b/src/tribler/core/components/libtorrent/tests/test_download.py index a2e73c9a733..b79adedac6c 100644 --- a/src/tribler/core/components/libtorrent/tests/test_download.py +++ b/src/tribler/core/components/libtorrent/tests/test_download.py @@ -38,7 +38,7 @@ async def test_download_resume_in_upload_mode(mock_handle, mock_download_config, test_download.handle.set_upload_mode.assert_called_with(test_download.get_upload_mode()) -async def test_save_resume(mock_handle, test_download, test_tdef): +async def test_save_resume(mock_handle, test_download: Download, test_tdef): """ testing call resume data alert """ @@ -49,7 +49,7 @@ async def test_save_resume(mock_handle, test_download, test_tdef): alert = Mock(resume_data={b'info-hash': test_tdef.get_infohash()}) await test_download.save_resume_data() basename = hexlify(test_tdef.get_infohash()) + '.conf' - filename = test_download.dlmgr.get_checkpoint_dir() / basename + filename = test_download.download_manager.get_checkpoint_dir() / basename dcfg = DownloadConfig.load(str(filename)) assert test_tdef.get_infohash() == dcfg.get_engineresumedata().get(b'info-hash') @@ -78,7 +78,7 @@ def mock_move(s): async def test_save_checkpoint(test_download, test_tdef): await test_download.checkpoint() basename = hexlify(test_tdef.get_infohash()) + '.conf' - filename = Path(test_download.dlmgr.get_checkpoint_dir() / basename) + filename = Path(test_download.download_manager.get_checkpoint_dir() / basename) assert filename.is_file() diff --git a/src/tribler/core/components/metadata_store/db/orm_bindings/torrent_state.py b/src/tribler/core/components/metadata_store/db/orm_bindings/torrent_state.py index 0e501100226..0d8d15aee93 100644 --- a/src/tribler/core/components/metadata_store/db/orm_bindings/torrent_state.py +++ b/src/tribler/core/components/metadata_store/db/orm_bindings/torrent_state.py @@ -1,4 +1,7 @@ +from __future__ import annotations + from pony import orm +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo def define_binding(db): @@ -17,4 +20,13 @@ class TorrentState(db.Entity): metadata = orm.Set('TorrentMetadata', reverse='health') trackers = orm.Set('TrackerState', reverse='torrents') + @classmethod + def from_health(cls, health: HealthInfo): + return cls(infohash=health.infohash, seeders=health.seeders, leechers=health.leechers, + last_check=health.last_check) + + def to_health(self) -> HealthInfo: + return HealthInfo(infohash=self.infohash, last_check=self.last_check, + seeders=self.seeders, leechers=self.leechers) + return TorrentState diff --git a/src/tribler/core/components/metadata_store/db/store.py b/src/tribler/core/components/metadata_store/db/store.py index 17fde841ea9..0f4f75c78ba 100644 --- a/src/tribler/core/components/metadata_store/db/store.py +++ b/src/tribler/core/components/metadata_store/db/store.py @@ -23,7 +23,7 @@ metadata_node, misc, torrent_metadata, - torrent_state, + torrent_state as torrent_state_, tracker_state, vsids, ) @@ -44,6 +44,7 @@ read_payload_with_offset, ) from tribler.core.components.metadata_store.remote_query_community.payload_checker import process_payload +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo from tribler.core.exceptions import InvalidSignatureException from tribler.core.utilities.notifier import Notifier from tribler.core.utilities.path_util import Path @@ -188,7 +189,7 @@ def on_connect(_, connection): self.MiscData = misc.define_binding(self.db) self.TrackerState = tracker_state.define_binding(self.db) - self.TorrentState = torrent_state.define_binding(self.db) + self.TorrentState = torrent_state_.define_binding(self.db) self.ChannelNode = channel_node.define_binding(self.db, logger=self._logger, key=my_key) @@ -471,23 +472,29 @@ def process_compressed_mdblob(self, compressed_data, **kwargs): return self.process_squashed_mdblob(decompressed_data, health_info=health_info, **kwargs) - def process_torrent_health(self, infohash: bytes, seeders: int, leechers: int, last_check: int) -> bool: + def process_torrent_health(self, health: HealthInfo) -> bool: """ Adds or updates information about a torrent health for the torrent with the specified infohash value - :param infohash: the infohash of the torrent - :param seeders: a number of seeders - :param leechers: a number of leechers - :param last_check: a timestamp when the seeders/leechers count was checked + :param health: a health info of a torrent :return: True if a new TorrentState object was added """ - health = self.TorrentState.get_for_update(infohash=infohash) - if health and last_check > health.last_check: - health.set(seeders=seeders, leechers=leechers, last_check=last_check) - self._logger.debug(f"Update health info for {hexlify(infohash)}: ({seeders},{leechers})") - elif not health: - self.TorrentState(infohash=infohash, seeders=seeders, leechers=leechers, last_check=last_check) - self._logger.debug(f"Add health info for {hexlify(infohash)}: ({seeders},{leechers})") + if not health.is_valid(): + self._logger.warning(f'Invalid health info ignored: {health}') + return False + + torrent_state = self.TorrentState.get_for_update(infohash=health.infohash) + + if torrent_state and health.should_update(torrent_state): + self._logger.debug(f"Update health info {health}") + torrent_state.set(seeders=health.seeders, leechers=health.leechers, last_check=health.last_check, + self_checked=False) + return False + + if not torrent_state: + self._logger.debug(f"Add health info {health}") + self.TorrentState.from_health(health) return True + return False def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None, **kwargs): @@ -513,7 +520,9 @@ def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info with db_session: for payload, (seeders, leechers, last_check) in zip(payload_list, health_info): if hasattr(payload, 'infohash'): - self.process_torrent_health(payload.infohash, seeders, leechers, last_check) + health = HealthInfo(payload.infohash, last_check=last_check, + seeders=seeders, leechers=leechers) + self.process_torrent_health(health) result = [] total_size = len(payload_list) diff --git a/src/tribler/core/components/metadata_store/restapi/metadata_endpoint.py b/src/tribler/core/components/metadata_store/restapi/metadata_endpoint.py index 48be6e537f2..2b8426e05c2 100644 --- a/src/tribler/core/components/metadata_store/restapi/metadata_endpoint.py +++ b/src/tribler/core/components/metadata_store/restapi/metadata_endpoint.py @@ -1,14 +1,11 @@ +from asyncio import create_task from binascii import unhexlify from aiohttp import ContentTypeError, web - from aiohttp_apispec import docs - from ipv8.REST.base_endpoint import HTTP_BAD_REQUEST, HTTP_NOT_FOUND from ipv8.REST.schema import schema - from marshmallow.fields import Integer, String - from pony.orm import db_session from tribler.core.components.metadata_store.db.orm_bindings.channel_node import LEGACY_ENTRY @@ -194,24 +191,6 @@ async def get_channel_entries(self, request): 'default': 20, 'required': False, }, - { - 'in': 'query', - 'name': 'refresh', - 'description': 'Whether or not to force a health recheck. Settings this to 0 means that the ' - 'health of a torrent will not be checked again if it was recently checked.', - 'type': 'integer', - 'enum': [0, 1], - 'required': False, - }, - { - 'in': 'query', - 'name': 'nowait', - 'description': 'Whether or not to return immediately. If enabled, results ' - 'will be passed through to the events endpoint.', - 'type': 'integer', - 'enum': [0, 1], - 'required': False, - }, ], responses={ 200: { @@ -239,22 +218,12 @@ async def get_channel_entries(self, request): }, ) async def get_torrent_health(self, request): - timeout = request.query.get('timeout') - if not timeout: - timeout = TORRENT_CHECK_TIMEOUT - elif timeout.isdigit(): - timeout = int(timeout) - else: - return RESTResponse({"error": f"Error processing timeout parameter '{timeout}'"}, status=HTTP_BAD_REQUEST) - refresh = request.query.get('refresh', '0') == '1' - nowait = request.query.get('nowait', '0') == '1' + self._logger.info(f'Get torrent health request: {request}') + try: + timeout = int(request.query.get('timeout', TORRENT_CHECK_TIMEOUT)) + except ValueError as e: + return RESTResponse({"error": f"Error processing timeout parameter: {e}"}, status=HTTP_BAD_REQUEST) infohash = unhexlify(request.match_info['infohash']) - result_future = self.torrent_checker.check_torrent_health(infohash, timeout=timeout, scrape_now=refresh) - # Return immediately. Used by GUI to schedule health updates through the EventsEndpoint - if nowait: - return RESTResponse({'checking': '1'}) - - # Errors will be handled by error_middleware - result = await result_future - return RESTResponse({'health': result}) + create_task(self.torrent_checker.check_torrent_health(infohash, timeout=timeout, scrape_now=True)) + return RESTResponse({'checking': '1'}) diff --git a/src/tribler/core/components/metadata_store/restapi/tests/test_metadata_endpoint.py b/src/tribler/core/components/metadata_store/restapi/tests/test_metadata_endpoint.py index 973b31dd0a5..866083b51dd 100644 --- a/src/tribler/core/components/metadata_store/restapi/tests/test_metadata_endpoint.py +++ b/src/tribler/core/components/metadata_store/restapi/tests/test_metadata_endpoint.py @@ -3,7 +3,6 @@ import pytest from aiohttp.web_app import Application -from ipv8.util import succeed from pony.orm import db_session from tribler.core.components.metadata_store.db.orm_bindings.channel_node import COMMITTED, TODELETE, UPDATED @@ -15,7 +14,7 @@ from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker from tribler.core.config.tribler_config import TriblerConfig from tribler.core.utilities.unicode import hexlify -from tribler.core.utilities.utilities import has_bep33_support, random_infohash +from tribler.core.utilities.utilities import random_infohash # pylint: disable=unused-argument @@ -210,32 +209,8 @@ async def test_check_torrent_health(rest_api, mock_dlmgr, udp_tracker, metadata_ Test the endpoint to fetch the health of a chant-managed, infohash-only torrent """ infohash = b'a' * 20 - tracker_url = f'udp://localhost:{udp_tracker.port}/announce' - udp_tracker.tracker_info.add_info_about_infohash(infohash, 12, 11, 1) - - with db_session: - tracker_state = metadata_store.TrackerState(url=tracker_url) - torrent_state = metadata_store.TorrentState(trackers=tracker_state, infohash=infohash) - metadata_store.TorrentMetadata( - infohash=infohash, title='ubuntu-torrent.iso', size=42, tracker_info=tracker_url, health=torrent_state - ) - url = f'metadata/torrents/{hexlify(infohash)}/health?timeout={TORRENT_CHECK_TIMEOUT}&refresh=1' - - # Add mock DHT response - we both need to account for the case when BEP33 is used and the old lookup method - mock_dlmgr.get_metainfo = lambda _, **__: succeed(None) - dht_health_dict = {"infohash": hexlify(infohash), "seeders": 1, "leechers": 2} - mock_dlmgr.dht_health_manager.get_health = lambda *_, **__: succeed({"DHT": [dht_health_dict]}) - - # Left for compatibility with other tests in this object - await udp_tracker.start() - + url = f'metadata/torrents/{hexlify(infohash)}/health?timeout={TORRENT_CHECK_TIMEOUT}' json_response = await do_request(rest_api, url) - assert "health" in json_response - assert f"udp://localhost:{udp_tracker.port}" in json_response['health'] - if has_bep33_support(): - assert "DHT" in json_response['health'] - - json_response = await do_request(rest_api, url + '&nowait=1') assert json_response == {'checking': '1'} diff --git a/src/tribler/core/components/popularity/community/payload.py b/src/tribler/core/components/popularity/community/payload.py index 340f1da527c..c53411bd1de 100644 --- a/src/tribler/core/components/popularity/community/payload.py +++ b/src/tribler/core/components/popularity/community/payload.py @@ -1,7 +1,11 @@ +from typing import List + from ipv8.messaging.lazy_payload import VariablePayload, vp_compile from ipv8.messaging.payload_dataclass import dataclass from ipv8.messaging.serialization import default_serializer +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo + @vp_compile class TorrentInfoFormat(VariablePayload): @@ -40,9 +44,13 @@ def fix_unpack_torrents_checked(cls, value): return [payload.to_tuple() for payload in TorrentInfoFormat.from_list_bytes(value)] @classmethod - def create(cls, random_torrents_checked, popular_torrents_checked): + def create(cls, random_torrents_checked: List[HealthInfo], popular_torrents_checked: List[HealthInfo]): + random_torrent_tuples = [(health.infohash, health.seeders, health.leechers, health.last_check) + for health in random_torrents_checked] + popular_torrent_tuples = [(health.infohash, health.seeders, health.leechers, health.last_check) + for health in popular_torrents_checked] return cls(len(random_torrents_checked), len(popular_torrents_checked), - random_torrents_checked, popular_torrents_checked) + random_torrent_tuples, popular_torrent_tuples) @dataclass(msg_id=2) diff --git a/src/tribler/core/components/popularity/community/popularity_community.py b/src/tribler/core/components/popularity/community/popularity_community.py index e47531d3940..bad582fa216 100644 --- a/src/tribler/core/components/popularity/community/popularity_community.py +++ b/src/tribler/core/components/popularity/community/popularity_community.py @@ -1,19 +1,25 @@ -import heapq +from __future__ import annotations + import random from binascii import unhexlify +from typing import List, TYPE_CHECKING from ipv8.lazy_community import lazy_wrapper - from pony.orm import db_session from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity -from tribler.core.components.popularity.community.payload import TorrentsHealthPayload, PopularTorrentsRequest +from tribler.core.components.popularity.community.payload import PopularTorrentsRequest, TorrentsHealthPayload from tribler.core.components.popularity.community.version_community_mixin import VersionCommunityMixin +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo from tribler.core.utilities.pony_utils import run_threaded from tribler.core.utilities.unicode import hexlify from tribler.core.utilities.utilities import get_normally_distributed_positive_integers +if TYPE_CHECKING: + from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker + + class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin): """ Community for disseminating the content across the network. @@ -35,7 +41,7 @@ class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin): def __init__(self, *args, torrent_checker=None, **kwargs): # Creating a separate instance of Network for this community to find more peers super().__init__(*args, **kwargs) - self.torrent_checker = torrent_checker + self.torrent_checker: TorrentChecker = torrent_checker self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health) self.add_message_handler(PopularTorrentsRequest, self.on_popular_torrents_request) @@ -52,13 +58,12 @@ def introduction_request_callback(self, peer, dist, payload): # Send request to peer to send popular torrents self.ez_send(peer, PopularTorrentsRequest()) - def get_alive_checked_torrents(self): - if not self.torrent_checker or not self.torrent_checker.torrents_checked: + def get_alive_checked_torrents(self) -> List[HealthInfo]: + if not self.torrent_checker: return [] # Filter torrents that have seeders - alive = {(_, seeders, *rest) for (_, seeders, *rest) in self.torrent_checker.torrents_checked if seeders > 0} - return alive + return [health for health in self.torrent_checker.torrents_checked.values() if health.seeders > 0] def gossip_random_torrents_health(self): """ @@ -75,22 +80,24 @@ def gossip_random_torrents_health(self): @lazy_wrapper(TorrentsHealthPayload) async def on_torrents_health(self, peer, payload): self.logger.debug(f"Received torrent health information for " - f"{len(payload.torrents_checked)} popular torrents and" - f" {len(payload.random_torrents)} random torrents") + f"{len(payload.torrents_checked)} popular torrents and" + f" {len(payload.random_torrents)} random torrents") - torrents = payload.random_torrents + payload.torrents_checked + health_tuples = payload.random_torrents + payload.torrents_checked + health_list = [HealthInfo(infohash, last_check=last_check, seeders=seeders, leechers=leechers) + for infohash, seeders, leechers, last_check in health_tuples] - for infohash in await run_threaded(self.mds.db, self.process_torrents_health, torrents): + for infohash in await run_threaded(self.mds.db, self.process_torrents_health, health_list): # Get a single result per infohash to avoid duplicates self.send_remote_select(peer=peer, infohash=infohash, last=1) @db_session - def process_torrents_health(self, torrent_healths): + def process_torrents_health(self, health_list: List[HealthInfo]): infohashes_to_resolve = set() - for infohash, seeders, leechers, last_check in torrent_healths: - added = self.mds.process_torrent_health(infohash, seeders, leechers, last_check) + for health in health_list: + added = self.mds.process_torrent_health(health) if added: - infohashes_to_resolve.add(infohash) + infohashes_to_resolve.add(health.infohash) return infohashes_to_resolve @lazy_wrapper(PopularTorrentsRequest) @@ -99,20 +106,20 @@ async def on_popular_torrents_request(self, peer, payload): popular_torrents = self.get_likely_popular_torrents() self.ez_send(peer, TorrentsHealthPayload.create({}, popular_torrents)) - def get_likely_popular_torrents(self): + def get_likely_popular_torrents(self) -> List[HealthInfo]: checked_and_alive = self.get_alive_checked_torrents() if not checked_and_alive: - return {} + return [] num_torrents = len(checked_and_alive) num_torrents_to_send = min(PopularityCommunity.GOSSIP_RANDOM_TORRENT_COUNT, num_torrents) likely_popular_indices = self._get_likely_popular_indices(num_torrents_to_send, num_torrents) - sorted_torrents = sorted(list(checked_and_alive), key=lambda t: -t[1]) - likely_popular_torrents = {sorted_torrents[i] for i in likely_popular_indices} + sorted_torrents = sorted(list(checked_and_alive), key=lambda health: -health.seeders) + likely_popular_torrents = [sorted_torrents[i] for i in likely_popular_indices] return likely_popular_torrents - def _get_likely_popular_indices(self, size, limit): + def _get_likely_popular_indices(self, size, limit) -> List[int]: """ Returns a list of indices favoring the lower value numbers. @@ -124,13 +131,13 @@ def _get_likely_popular_indices(self, size, limit): """ return get_normally_distributed_positive_integers(size=size, upper_limit=limit) - def get_random_torrents(self): + def get_random_torrents(self) -> List[HealthInfo]: checked_and_alive = list(self.get_alive_checked_torrents()) if not checked_and_alive: - return {} + return [] num_torrents = len(checked_and_alive) num_torrents_to_send = min(PopularityCommunity.GOSSIP_RANDOM_TORRENT_COUNT, num_torrents) - random_torrents = set(random.sample(checked_and_alive, num_torrents_to_send)) + random_torrents = random.sample(checked_and_alive, num_torrents_to_send) return random_torrents diff --git a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py index 01b7159e0cc..3f904b2400f 100644 --- a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py +++ b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py @@ -1,6 +1,6 @@ import time from random import randint -from typing import Set, Tuple +from typing import List from unittest.mock import Mock from ipv8.keyvault.crypto import default_eccrypto @@ -12,12 +12,13 @@ from tribler.core.components.metadata_store.db.store import MetadataStore from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings from tribler.core.components.popularity.community.popularity_community import PopularityCommunity +from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo from tribler.core.tests.tools.base_test import MockObject from tribler.core.utilities.path_util import Path from tribler.core.utilities.utilities import random_infohash -def _generate_single_checked_torrent(status: str = None) -> Tuple: +def _generate_single_checked_torrent(status: str = None) -> HealthInfo: """ Assumptions DEAD -> peers: 0 @@ -31,12 +32,12 @@ def get_peers_for(health_status): return randint(101, 1000) return randint(1, 100) - # checked torrent structure is (infohash, seeders, leechers, last_check) - return random_infohash(), get_peers_for(status), get_peers_for(status), int(time.time()) + return HealthInfo(random_infohash(), last_check=int(time.time()), + seeders=get_peers_for(status), leechers=get_peers_for(status)) -def _generate_checked_torrents(count: int, status: str = None) -> Set: - return {_generate_single_checked_torrent(status) for _ in range(count)} +def _generate_checked_torrents(count: int, status: str = None) -> List[HealthInfo]: + return [_generate_single_checked_torrent(status) for _ in range(count)] class TestPopularityCommunity(TestBase): @@ -59,7 +60,7 @@ def create_node(self, *args, **kwargs): default_eccrypto.generate_key("curve25519")) self.metadata_store_set.add(mds) torrent_checker = MockObject() - torrent_checker.torrents_checked = set() + torrent_checker.torrents_checked = {} self.count += 1 @@ -76,8 +77,8 @@ def fill_database(self, metadata_store, last_check_now=False): metadata_store.TorrentState( infohash=str(torrent_ind).encode() * 20, seeders=torrent_ind + 1, last_check=last_check) - async def init_first_node_and_gossip(self, checked_torrent_info, deliver_timeout=.1): - self.nodes[0].overlay.torrent_checker.torrents_checked.add(checked_torrent_info) + async def init_first_node_and_gossip(self, checked_torrent_info: HealthInfo, deliver_timeout: float = 0.1): + self.nodes[0].overlay.torrent_checker.torrents_checked[checked_torrent_info.infohash] = checked_torrent_info await self.introduce_nodes() self.nodes[0].overlay.gossip_random_torrents_health() @@ -88,7 +89,7 @@ async def test_torrents_health_gossip(self): """ Test whether torrent health information is correctly gossiped around """ - checked_torrent_info = (b'a' * 20, 200, 0, int(time.time())) + checked_torrent_info = HealthInfo(b'a' * 20, seeders=200, leechers=0, last_check=int(time.time())) node0_db = self.nodes[0].overlay.mds.TorrentState node1_db2 = self.nodes[1].overlay.mds.TorrentState @@ -101,21 +102,22 @@ async def test_torrents_health_gossip(self): # Check whether node 1 has new torrent health information with db_session: torrent = node1_db2.select().first() - assert torrent.infohash == checked_torrent_info[0] - assert torrent.seeders == checked_torrent_info[1] - assert torrent.leechers == checked_torrent_info[2] - assert torrent.last_check == checked_torrent_info[3] + assert torrent.infohash == checked_torrent_info.infohash + assert torrent.seeders == checked_torrent_info.seeders + assert torrent.leechers == checked_torrent_info.leechers + assert torrent.last_check == checked_torrent_info.last_check def test_get_alive_torrents(self): dead_torrents = _generate_checked_torrents(100, 'DEAD') popular_torrents = _generate_checked_torrents(100, 'POPULAR') alive_torrents = _generate_checked_torrents(100) - all_checked_torrents = dead_torrents | alive_torrents | popular_torrents - self.nodes[0].overlay.torrent_checker.torrents_checked.update(all_checked_torrents) + all_checked_torrents = dead_torrents + alive_torrents + popular_torrents + self.nodes[0].overlay.torrent_checker.torrents_checked.update( + {health.infohash: health for health in all_checked_torrents}) actual_alive_torrents = self.nodes[0].overlay.get_alive_checked_torrents() - assert len(actual_alive_torrents) == len(alive_torrents | popular_torrents) + assert len(actual_alive_torrents) == len(alive_torrents + popular_torrents) async def test_torrents_health_gossip_multiple(self): """ @@ -125,7 +127,7 @@ async def test_torrents_health_gossip_multiple(self): popular_torrents = _generate_checked_torrents(100, 'POPULAR') alive_torrents = _generate_checked_torrents(100) - all_checked_torrents = dead_torrents | alive_torrents | popular_torrents + all_checked_torrents = dead_torrents + alive_torrents + popular_torrents node0_db = self.nodes[0].overlay.mds.TorrentState node1_db = self.nodes[1].overlay.mds.TorrentState @@ -138,7 +140,8 @@ async def test_torrents_health_gossip_multiple(self): assert node1_count == 0 # Setup, node 0 checks some torrents, both dead and alive (including popular ones). - self.nodes[0].overlay.torrent_checker.torrents_checked.update(all_checked_torrents) + self.nodes[0].overlay.torrent_checker.torrents_checked.update( + {health.infohash: health for health in all_checked_torrents}) # Nodes are introduced await self.introduce_nodes() @@ -177,7 +180,7 @@ async def test_torrents_health_update(self): """ self.fill_database(self.nodes[1].overlay.mds) - checked_torrent_info = (b'0' * 20, 200, 0, int(time.time())) + checked_torrent_info = HealthInfo(b'0' * 20, seeders=200, leechers=0, last_check=int(time.time())) await self.init_first_node_and_gossip(checked_torrent_info, deliver_timeout=0.5) # Check whether node 1 has new torrent health information @@ -193,7 +196,8 @@ async def test_unknown_torrent_query_back(self): infohash = b'1' * 20 with db_session: self.nodes[0].overlay.mds.TorrentMetadata(infohash=infohash) - await self.init_first_node_and_gossip((infohash, 200, 0, int(time.time()))) + await self.init_first_node_and_gossip( + HealthInfo(infohash, seeders=200, leechers=0, last_check=int(time.time()))) with db_session: assert self.nodes[1].overlay.mds.TorrentMetadata.get() @@ -204,5 +208,6 @@ async def test_skip_torrent_query_back_for_known_torrent(self): self.nodes[0].overlay.mds.TorrentMetadata(infohash=infohash) self.nodes[1].overlay.mds.TorrentMetadata(infohash=infohash) self.nodes[1].overlay.send_remote_select = Mock() - await self.init_first_node_and_gossip((infohash, 200, 0, int(time.time()))) + await self.init_first_node_and_gossip( + HealthInfo(infohash, seeders=200, leechers=0, last_check=int(time.time()))) self.nodes[1].overlay.send_remote_select.assert_not_called() diff --git a/src/tribler/core/components/restapi/rest/events_endpoint.py b/src/tribler/core/components/restapi/rest/events_endpoint.py index 1d63c3286a3..30f4e9364e6 100644 --- a/src/tribler/core/components/restapi/rest/events_endpoint.py +++ b/src/tribler/core/components/restapi/rest/events_endpoint.py @@ -109,6 +109,7 @@ async def write_data(self, message): if not self.has_connection_to_gui(): return try: + self._logger.debug(f'Write message: {message}') message_bytes = self.encode_message(message) except Exception as e: # pylint: disable=broad-except # if a notification arguments contains non-JSON-serializable data, the exception should be logged diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/__init__.py b/src/tribler/core/components/torrent_checker/torrent_checker/__init__.py index 6b7e145a934..30e39822657 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/__init__.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/__init__.py @@ -1,3 +1,4 @@ """ The TorrentChecker package contains code that checks and schedules torrents. """ +DHT = "DHT" diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/dataclasses.py b/src/tribler/core/components/torrent_checker/torrent_checker/dataclasses.py new file mode 100644 index 00000000000..a25a909d34a --- /dev/null +++ b/src/tribler/core/components/torrent_checker/torrent_checker/dataclasses.py @@ -0,0 +1,66 @@ +import time +from dataclasses import dataclass, field +from typing import List + +import human_readable + +from tribler.core.utilities.unicode import hexlify + + +TOLERABLE_TIME_DRIFT = 60 # one minute +HOUR = 60 * 60 + + +@dataclass +class HealthInfo: + infohash: bytes = field(repr=False) + last_check: int + seeders: int = 0 + leechers: int = 0 + + def __repr__(self): + infohash_repr = hexlify(self.infohash[:4]) + age = self._last_check_repr(self.last_check) + return f"{self.__class__.__name__}('{infohash_repr}', {self.seeders}/{self.leechers}, {age})" + + @staticmethod + def _last_check_repr(last_check: int) -> str: + if last_check < 0: + return 'invalid time' + + if last_check == 0: + return 'never checked' + + now = int(time.time()) + diff = now - last_check + if diff == 0: + return 'just checked' + + age = human_readable.time_delta(diff, use_months=False) + return age + (' ago' if diff > 0 else ' in the future') + + @property + def infohash_hex(self): + return hexlify(self.infohash) + + def is_valid(self) -> bool: + return self.last_check < int(time.time()) + TOLERABLE_TIME_DRIFT + + def should_update(self, torrent_state, self_checked=False): + if self.last_check <= torrent_state.last_check: + # The torrent state in the DB is already fresher than this health + return False + + now = int(time.time()) + hour_ago = now - HOUR + if not self_checked and torrent_state.self_checked and hour_ago <= torrent_state.last_check <= now: + # The torrent state in the DB was locally checked just recently, + # and we trust this recent local check more than the new health info received remotely + return False + + return True + +@dataclass +class TrackerResponse: + url: str + torrent_health_list: List[HealthInfo] diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py index e7edf6187ab..df231c4cd65 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py @@ -2,21 +2,24 @@ import random import secrets import time -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock import pytest from ipv8.util import succeed from pony.orm import db_session import tribler.core.components.torrent_checker.torrent_checker.torrent_checker as torrent_checker_module +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo, TrackerResponse +from tribler.core.components.torrent_checker.torrent_checker.utils import aggregate_responses_for_infohash, \ + filter_non_exceptions from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker -from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HttpTrackerSession, \ - UdpSocketManager +from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import \ + HttpTrackerSession, UdpSocketManager from tribler.core.components.torrent_checker.torrent_checker.tracker_manager import TrackerManager -from tribler.core.tests.tools.base_test import MockObject -from tribler.core.utilities.unicode import hexlify +# pylint: disable=protected-access + @pytest.fixture def tracker_manager(tmp_path, metadata_store): return TrackerManager(state_dir=tmp_path, metadata_store=metadata_store) @@ -24,7 +27,6 @@ def tracker_manager(tmp_path, metadata_store): @pytest.fixture(name="torrent_checker") async def fixture_torrent_checker(tribler_config, tracker_manager, metadata_store): - torrent_checker = TorrentChecker(config=tribler_config, download_manager=MagicMock(), notifier=MagicMock(), @@ -35,19 +37,11 @@ async def fixture_torrent_checker(tribler_config, tracker_manager, metadata_stor await torrent_checker.shutdown() -async def test_initialize(torrent_checker): # pylint: disable=unused-argument - """ - Test the initialization of the torrent checker - """ - await torrent_checker.initialize() - assert torrent_checker.is_pending_task_active("tracker_check") - assert torrent_checker.is_pending_task_active("torrent_check") - - async def test_create_socket_fail(torrent_checker): """ Test creation of the UDP socket of the torrent checker when it fails """ + def mocked_listen_on_udp(): raise OSError("Something went wrong") @@ -70,9 +64,8 @@ async def test_health_check_blacklisted_trackers(torrent_checker): torrent_checker.tracker_manager.blacklist.append("http://localhost/tracker") result = await torrent_checker.check_torrent_health(b'a' * 20) - assert {'db'} == set(result.keys()) - assert result['db']['seeders'] == 5 - assert result['db']['leechers'] == 10 + assert result.seeders == 5 + assert result.leechers == 10 async def test_health_check_cached(torrent_checker): @@ -85,9 +78,8 @@ async def test_health_check_cached(torrent_checker): last_check=int(time.time())) result = await torrent_checker.check_torrent_health(b'a' * 20) - assert 'db' in result - assert result['db']['seeders'] == 5 - assert result['db']['leechers'] == 10 + assert result.seeders == 5 + assert result.leechers == 10 def test_load_torrents_check_from_db(torrent_checker): # pylint: disable=unused-argument @@ -95,6 +87,7 @@ def test_load_torrents_check_from_db(torrent_checker): # pylint: disable=unused Test if the torrents_checked set is properly initialized based on the last_check and self_checked values from the database. """ + @db_session def save_random_torrent_state(last_checked=0, self_checked=False, count=1): for _ in range(count): @@ -112,16 +105,19 @@ def save_random_torrent_state(last_checked=0, self_checked=False, count=1): # Case 1: Save random 10 non-self checked torrents # Expected: empty set, since only self checked torrents are considered. save_random_torrent_state(last_checked=now, self_checked=False, count=10) + torrent_checker._torrents_checked = None # pylint: disable=protected-access assert not torrent_checker.torrents_checked # Case 2: Save 10 self checked torrent but not within the freshness period # Expected: empty set, since only self checked fresh torrents are considered. save_random_torrent_state(last_checked=before_threshold, self_checked=True, count=10) + torrent_checker._torrents_checked = None # pylint: disable=protected-access assert not torrent_checker.torrents_checked # Case 3: Save 10 self checked fresh torrents # Expected: 10 torrents, since there are 10 self checked and fresh torrents save_random_torrent_state(last_checked=after_threshold, self_checked=True, count=10) + torrent_checker._torrents_checked = None # pylint: disable=protected-access assert len(torrent_checker.torrents_checked) == 10 # Case 4: Save some more self checked fresh torrents @@ -132,7 +128,7 @@ def save_random_torrent_state(last_checked=0, self_checked=False, count=1): # Case 5: Clear the torrent_checked set (private variable), # and save freshly self checked torrents more than max return size (10 more). # Expected: max (return size) torrents, since limit is placed on how many to load. - torrent_checker._torrents_checked = dict() # pylint: disable=protected-access + torrent_checker._torrents_checked = None # pylint: disable=protected-access return_size = torrent_checker_module.TORRENTS_CHECKED_RETURN_SIZE save_random_torrent_state(last_checked=after_threshold, self_checked=True, count=return_size + 10) assert len(torrent_checker.torrents_checked) == return_size @@ -166,7 +162,7 @@ async def test_check_random_tracker_not_alive(torrent_checker): assert not result with db_session: - tracker = torrent_checker.tracker_manager.tracker_store.get() + tracker = torrent_checker.tracker_manager.TrackerState.get() assert not tracker.alive @@ -184,11 +180,11 @@ async def test_task_select_tracker(torrent_checker): assert not result assert len(controlled_session.infohash_list) == 1 - + await controlled_session.cleanup() -async def test_tracker_test_error_resolve(torrent_checker): +async def test_tracker_test_error_resolve(torrent_checker: TorrentChecker): """ Test whether we capture the error when a tracker check fails """ @@ -200,7 +196,7 @@ async def test_tracker_test_error_resolve(torrent_checker): assert not result # Verify whether we successfully cleaned up the session after an error - assert len(torrent_checker._session_list) == 1 + assert not torrent_checker._sessions async def test_tracker_no_infohashes(torrent_checker): @@ -216,10 +212,14 @@ def test_get_valid_next_tracker_for_auto_check(torrent_checker): """ Test if only valid tracker url are used for auto check """ - mock_tracker_state_invalid = MockObject() - mock_tracker_state_invalid.url = "http://anno nce.torrentsmd.com:8080/announce" - mock_tracker_state_valid = MockObject() - mock_tracker_state_valid.url = "http://announce.torrentsmd.com:8080/announce" + mock_tracker_state_invalid = MagicMock( + url="http://anno nce.torrentsmd.com:8080/announce", + failures=0 + ) + mock_tracker_state_valid = MagicMock( + url="http://announce.torrentsmd.com:8080/announce", + failures=0 + ) tracker_states = [mock_tracker_state_invalid, mock_tracker_state_valid] def get_next_tracker_for_auto_check(): @@ -228,69 +228,52 @@ def get_next_tracker_for_auto_check(): def remove_tracker(_): tracker_states.remove(mock_tracker_state_invalid) - torrent_checker.get_next_tracker_for_auto_check = get_next_tracker_for_auto_check - torrent_checker.remove_tracker = remove_tracker - - next_tracker = torrent_checker.get_valid_next_tracker_for_auto_check() + torrent_checker.tracker_manager.get_next_tracker = get_next_tracker_for_auto_check + torrent_checker.tracker_manager.remove_tracker = remove_tracker + next_tracker = torrent_checker.get_next_tracker() assert len(tracker_states) == 1 assert next_tracker.url == "http://announce.torrentsmd.com:8080/announce" -def test_on_health_check_completed(torrent_checker): - tracker1 = 'udp://localhost:2801' - tracker2 = "http://badtracker.org/announce" - infohash_bin = b'\xee'*20 - infohash_hex = hexlify(infohash_bin) +def test_filter_non_exceptions(): + response = TrackerResponse(url='url', torrent_health_list=[]) + responses = [response, Exception()] + + assert filter_non_exceptions(responses) == [response] - exception = Exception() - exception.tracker_url = tracker2 - result = [ - {tracker1: [{'leechers': 1, 'seeders': 2, 'infohash': infohash_hex}]}, - exception, - {'DHT': [{'leechers': 12, 'seeders': 13, 'infohash': infohash_hex}]} + +def test_update_health(torrent_checker: TorrentChecker): + infohash = b'\xee' * 20 + + now = int(time.time()) + responses = [ + TrackerResponse( + url='udp://localhost:2801', + torrent_health_list=[HealthInfo(infohash, last_check=now, leechers=1, seeders=2)] + ), + TrackerResponse( + url='DHT', + torrent_health_list=[HealthInfo(infohash, last_check=now, leechers=12, seeders=13)] + ), ] - # Check that everything works fine even if the database contains no proper infohash - res_dict = { - 'DHT': { - 'leechers': 12, - 'seeders': 13, - 'infohash': infohash_hex - }, - 'http://badtracker.org/announce': { - 'error': '' - }, - 'udp://localhost:2801': { - 'leechers': 1, - 'seeders': 2, - 'infohash': infohash_hex - } - } - torrent_checker.on_torrent_health_check_completed(infohash_bin, result) - assert torrent_checker.on_torrent_health_check_completed(infohash_bin, result) == res_dict - assert not torrent_checker.on_torrent_health_check_completed(infohash_bin, None) - with db_session: - ts = torrent_checker.mds.TorrentState(infohash=infohash_bin) - previous_check = ts.last_check - torrent_checker.on_torrent_health_check_completed(infohash_bin, result) - assert 1 == len(torrent_checker.torrents_checked) - assert result[2]['DHT'][0]['leechers'] == ts.leechers - assert result[2]['DHT'][0]['seeders'] == ts.seeders - assert previous_check < ts.last_check + health = aggregate_responses_for_infohash(infohash, responses) + # Check that everything works fine even if the database contains no proper infohash + updated = torrent_checker.update_torrent_health(health) + assert not updated -def test_on_health_check_failed(torrent_checker): - """ - Check whether there is no crash when the torrent health check failed and the response is None - No torrent info is added to torrent_checked list. - """ - infohash_bin = b'\xee' * 20 - torrent_checker.on_torrent_health_check_completed(infohash_bin, [None]) - assert 0 == len(torrent_checker.torrents_checked) + with db_session: + ts = torrent_checker.mds.TorrentState(infohash=infohash) + updated = torrent_checker.update_torrent_health(health) + assert updated + assert len(torrent_checker.torrents_checked) == 1 + assert ts.leechers == 12 + assert ts.seeders == 13 + assert ts.last_check == now -@db_session -def test_check_local_torrents(torrent_checker): +async def test_check_local_torrents(torrent_checker): """ Test that the random torrent health checking mechanism picks the right torrents """ @@ -302,7 +285,7 @@ def random_infohash(): torrent_checker.check_torrent_health = lambda _: succeed(None) # No torrents yet, the selected torrents should be empty - selected_torrents = torrent_checker.check_local_torrents() + selected_torrents, _ = await torrent_checker.check_local_torrents() assert len(selected_torrents) == 0 # Add some freshly checked torrents @@ -310,10 +293,11 @@ def random_infohash(): fresh_infohashes = [] for index in range(0, num_torrents): infohash = random_infohash() - torrent = torrent_checker.mds.TorrentMetadata(title=f'torrent{index}', infohash=infohash) - torrent.health.seeders = index - torrent.health.last_check = int(time_fresh) + index - fresh_infohashes.append(infohash) + with db_session: + torrent = torrent_checker.mds.TorrentMetadata(title=f'torrent{index}', infohash=infohash) + torrent.health.seeders = index + torrent.health.last_check = int(time_fresh) + index + fresh_infohashes.append(infohash) # Add some stale (old) checked torrents time_stale = time_fresh - torrent_checker_module.HEALTH_FRESHNESS_SECONDS @@ -321,13 +305,14 @@ def random_infohash(): max_seeder = 10000 # some random value for index in range(0, num_torrents): infohash = random_infohash() - torrent = torrent_checker.mds.TorrentMetadata(title=f'torrent{index}', infohash=infohash) - torrent.health.seeders = max_seeder - index # Note: decreasing trend - torrent.health.last_check = int(time_stale) - index # Note: decreasing trend + with db_session: + torrent = torrent_checker.mds.TorrentMetadata(title=f'torrent{index}', infohash=infohash) + torrent.health.seeders = max_seeder - index # Note: decreasing trend + torrent.health.last_check = int(time_stale) - index # Note: decreasing trend stale_infohashes.append(infohash) # Now check that all torrents selected for check are stale torrents. - selected_torrents = torrent_checker.check_local_torrents() + selected_torrents, _ = await torrent_checker.check_local_torrents() assert len(selected_torrents) <= torrent_checker_module.TORRENT_SELECTION_POOL_SIZE # In the above setup, both seeder (popularity) count and last_check are decreasing so, @@ -335,14 +320,13 @@ def random_infohash(): # 2. Older torrents are towards the back # Therefore the selection range becomes: selection_range = stale_infohashes[0: torrent_checker_module.TORRENT_SELECTION_POOL_SIZE] \ - + stale_infohashes[- torrent_checker_module.TORRENT_SELECTION_POOL_SIZE:] + + stale_infohashes[- torrent_checker_module.TORRENT_SELECTION_POOL_SIZE:] - for infohash in selected_torrents: - assert infohash in selection_range + for t in selected_torrents: + assert t.infohash in selection_range -@db_session -def test_check_channel_torrents(torrent_checker): +async def test_check_channel_torrents(torrent_checker: TorrentChecker): """ Test that the channel torrents are checked based on last checked time. Only outdated torrents are selected for health checks. @@ -351,13 +335,14 @@ def test_check_channel_torrents(torrent_checker): def random_infohash(): return os.urandom(20) + @db_session def add_torrent_to_channel(infohash, last_check): torrent = torrent_checker.mds.TorrentMetadata(public_key=torrent_checker.mds.my_public_key_bin, infohash=infohash) torrent.health.last_check = last_check return torrent - check_torrent_health_mock = MagicMock(return_value=None) + check_torrent_health_mock = AsyncMock(return_value=None) torrent_checker.check_torrent_health = lambda _: check_torrent_health_mock() # No torrents yet in channel, the selected channel torrents to check should be empty @@ -365,7 +350,7 @@ def add_torrent_to_channel(infohash, last_check): assert len(selected_torrents) == 0 # No health check call are done - torrent_checker.check_torrents_in_user_channel() + await torrent_checker.check_torrents_in_user_channel() assert check_torrent_health_mock.call_count == len(selected_torrents) num_torrents = 20 @@ -381,14 +366,14 @@ def add_torrent_to_channel(infohash, last_check): outdated_torrents = [] for _ in range(num_torrents): torrent = add_torrent_to_channel(random_infohash(), last_check=timestamp_outdated) - outdated_torrents.append(torrent) + outdated_torrents.append(torrent.infohash) # Now check that only outdated torrents are selected for check selected_torrents = torrent_checker.torrents_to_check_in_user_channel() assert len(selected_torrents) <= torrent_checker_module.USER_CHANNEL_TORRENT_SELECTION_POOL_SIZE for torrent in selected_torrents: - assert torrent in outdated_torrents + assert torrent.infohash in outdated_torrents # Health check requests are sent for all selected torrents - torrent_checker.check_torrents_in_user_channel() - assert check_torrent_health_mock.call_count == len(selected_torrents) + result = await torrent_checker.check_torrents_in_user_channel() + assert len(result) == len(selected_torrents) diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker_session.py b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker_session.py index e8125d906a1..e2588eefe21 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker_session.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker_session.py @@ -1,25 +1,20 @@ import socket import struct +import time from asyncio import CancelledError, DatagramProtocol, Future, ensure_future, get_event_loop, sleep, start_server from unittest.mock import Mock +import pytest from aiohttp.web_exceptions import HTTPBadRequest - from ipv8.util import succeed - from libtorrent import bencode -import pytest +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo +from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import \ + FakeBep33DHTSession, FakeDHTSession, HttpTrackerSession, UdpSocketManager, UdpTrackerSession -from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import ( - FakeBep33DHTSession, - FakeDHTSession, - HttpTrackerSession, - UdpSocketManager, - UdpTrackerSession, -) -from tribler.core.utilities.unicode import hexlify +# pylint: disable=redefined-outer-name class FakeUdpSocketManager: transport = 1 @@ -39,14 +34,14 @@ def fixture_fake_udp_socket_manager(): @pytest.fixture async def bep33_session(mock_dlmgr): - bep33_dht_session = FakeBep33DHTSession(mock_dlmgr, b'a' * 20, 10) + bep33_dht_session = FakeBep33DHTSession(mock_dlmgr, 10) yield bep33_dht_session await bep33_dht_session.cleanup() @pytest.fixture async def fake_dht_session(mock_dlmgr): - fake_dht_session = FakeDHTSession(mock_dlmgr, b'a' * 20, 10) + fake_dht_session = FakeDHTSession(mock_dlmgr, 10) yield fake_dht_session await fake_dht_session.shutdown_task_manager() @@ -74,6 +69,7 @@ async def test_httpsession_code_not_200(): def fake_request(_): raise HTTPBadRequest() + session._session.get = fake_request with pytest.raises(Exception): @@ -340,20 +336,24 @@ async def test_connect_to_tracker(mock_dlmgr, fake_dht_session): """ metainfo = {b'seeders': 42, b'leechers': 42} mock_dlmgr.get_metainfo = lambda *_, **__: succeed(metainfo) + fake_dht_session.add_infohash(b'a' * 20) + response = await fake_dht_session.connect_to_tracker() - metainfo = await fake_dht_session.connect_to_tracker() + assert response.url == 'DHT' + assert len(response.torrent_health_list) == 1 - assert 'DHT' in metainfo - assert metainfo['DHT'][0]['leechers'] == 42 - assert metainfo['DHT'][0]['seeders'] == 42 + health = response.torrent_health_list[0] + assert health.leechers == 42 + assert health.seeders == 42 async def test_connect_to_tracker_fail(mock_dlmgr, fake_dht_session): """ Test the metainfo lookup of the DHT session when it fails """ - mock_dlmgr.get_metainfo = lambda *_, **__: succeed(None) - with pytest.raises(RuntimeError): + mock_dlmgr.get_metainfo = Mock(side_effect=TimeoutError) + fake_dht_session.add_infohash(b'a' * 20) + with pytest.raises(TimeoutError): await fake_dht_session.connect_to_tracker() @@ -361,24 +361,16 @@ async def test_connect_to_tracker_bep33(bep33_session, mock_dlmgr): """ Test the metainfo lookup of the BEP33 DHT session """ - dht_health_dict = { - "infohash": hexlify(b'a' * 20), - "seeders": 1, - "leechers": 2 - } - mock_dlmgr.dht_health_manager = Mock() - mock_dlmgr.dht_health_manager.get_health = lambda *_, **__: succeed({"DHT": [dht_health_dict]}) + infohash = b'a' * 20 + infohash_health = HealthInfo(infohash, last_check=int(time.time()), seeders=1, leechers=2) - metainfo = await bep33_session.connect_to_tracker() - - assert 'DHT' in metainfo - assert metainfo['DHT'][0]['leechers'] == 2 - assert metainfo['DHT'][0]['seeders'] == 1 - - -def test_methods(bep33_session): - """ - Test various methods in the DHT session class - """ - bep33_session.add_infohash('b' * 20) - assert bep33_session.infohash == 'b' * 20 + mock_dlmgr.dht_health_manager = Mock() + mock_dlmgr.dht_health_manager.get_health = lambda *_, **__: succeed(infohash_health) + bep33_session.add_infohash(infohash) + response = await bep33_session.connect_to_tracker() + assert response.url == 'DHT' + assert len(response.torrent_health_list) == 1 + + health = response.torrent_health_list[0] + assert health.leechers == 2 + assert health.seeders == 1 diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_tracker_manager.py b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_tracker_manager.py index 5a92a080398..12538e5b7d9 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_tracker_manager.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_tracker_manager.py @@ -62,21 +62,21 @@ def test_get_tracker_for_check(tracker_manager): """ Test whether the correct tracker is returned when fetching the next eligable tracker for the auto check """ - assert not tracker_manager.get_next_tracker_for_auto_check() + assert not tracker_manager.get_next_tracker() tracker_manager.add_tracker("http://test1.com:80/announce") - assert tracker_manager.get_next_tracker_for_auto_check().url == 'http://test1.com/announce' + assert tracker_manager.get_next_tracker().url == 'http://test1.com/announce' def test_get_tracker_for_check_blacklist(tracker_manager): """ Test whether the next tracker for autocheck is not in the blacklist """ - assert not tracker_manager.get_next_tracker_for_auto_check() + assert not tracker_manager.get_next_tracker() tracker_manager.add_tracker("http://test1.com:80/announce") tracker_manager.blacklist.append("http://test1.com/announce") - assert not tracker_manager.get_next_tracker_for_auto_check() + assert not tracker_manager.get_next_tracker() def test_load_blacklist_from_file_none(tracker_manager): diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py b/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py index 2fea2b821f5..be466007011 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py @@ -2,23 +2,24 @@ import logging import random import time -from asyncio import CancelledError, gather -from typing import List, Optional - -from ipv8.taskmanager import TaskManager, task +from asyncio import CancelledError +from collections import defaultdict +from typing import Dict, List, Optional, Tuple, Union +from ipv8.taskmanager import TaskManager from pony.orm import db_session, desc, select +from pony.utils import between from tribler.core import notifications from tribler.core.components.libtorrent.download_manager.download_manager import DownloadManager from tribler.core.components.metadata_store.db.serialization import REGULAR_TORRENT from tribler.core.components.metadata_store.db.store import MetadataStore -from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import ( - FakeBep33DHTSession, - FakeDHTSession, - UdpSocketManager, - create_tracker_session, -) +from tribler.core.components.torrent_checker.torrent_checker import DHT +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo, TrackerResponse +from tribler.core.components.torrent_checker.torrent_checker.utils import aggregate_responses_for_infohash, \ + filter_non_exceptions, gather_coros, aggregate_health_by_infohash +from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import \ + FakeBep33DHTSession, FakeDHTSession, TrackerSession, UdpSocketManager, create_tracker_session from tribler.core.components.torrent_checker.torrent_checker.tracker_manager import MAX_TRACKER_FAILURES, TrackerManager from tribler.core.config.tribler_config import TriblerConfig from tribler.core.utilities.notifier import Notifier @@ -26,9 +27,9 @@ from tribler.core.utilities.unicode import hexlify from tribler.core.utilities.utilities import has_bep33_support, is_valid_url -TRACKER_SELECTION_INTERVAL = 20 # The interval for querying a random tracker +TRACKER_SELECTION_INTERVAL = 1 # The interval for querying a random tracker TORRENT_SELECTION_INTERVAL = 120 # The interval for checking the health of a random torrent -USER_CHANNEL_TORRENT_SELECTION_INTERVAL = 15 # The interval for checking the health of torrents in user's channel. +USER_CHANNEL_TORRENT_SELECTION_INTERVAL = 10 * 60 # The interval for checking the health of torrents in user's channel. MIN_TORRENT_CHECK_INTERVAL = 900 # How much time we should wait before checking a torrent again TORRENT_CHECK_RETRY_INTERVAL = 30 # Interval when the torrent was successfully checked for the last time MAX_TORRENTS_CHECKED_PER_SESSION = 50 @@ -40,7 +41,6 @@ class TorrentChecker(TaskManager): - def __init__(self, config: TriblerConfig, download_manager: DownloadManager, @@ -52,26 +52,25 @@ def __init__(self, self._logger = logging.getLogger(self.__class__.__name__) self.tracker_manager = tracker_manager self.mds = metadata_store - self.dlmgr = download_manager + self.download_manager = download_manager self.notifier = notifier self.config = config self.socks_listen_ports = socks_listen_ports self._should_stop = False - self._session_list = {'DHT': []} - + self._sessions = defaultdict(list) self.socket_mgr = UdpSocketManager() self.udp_transport = None # We keep track of the results of popular torrents checked by you. # The popularity community gossips this information around. - self._torrents_checked = dict() + self._torrents_checked: Optional[Dict[bytes, HealthInfo]] = None async def initialize(self): - self.register_task("tracker_check", self.check_random_tracker, interval=TRACKER_SELECTION_INTERVAL) - self.register_task("torrent_check", self.check_local_torrents, interval=TORRENT_SELECTION_INTERVAL) - self.register_task("user_channel_torrent_check", self.check_torrents_in_user_channel, + self.register_task("check random tracker", self.check_random_tracker, interval=TRACKER_SELECTION_INTERVAL) + self.register_task("check local torrents", self.check_local_torrents, interval=TORRENT_SELECTION_INTERVAL) + self.register_task("check channel torrents", self.check_torrents_in_user_channel, interval=USER_CHANNEL_TORRENT_SELECTION_INTERVAL) await self.create_socket_or_schedule() @@ -114,85 +113,92 @@ async def check_random_tracker(self): """ if self._should_stop: self._logger.warning("Not performing tracker check since we are shutting down") - return False - - tracker = self.get_valid_next_tracker_for_auto_check() - if tracker is None: - self._logger.warning("No tracker to select from, skip") - return False + return - self._logger.debug("Start selecting torrents on tracker %s.", tracker.url) + tracker = self.get_next_tracker() + if not tracker: + self._logger.warning("No tracker to select from to check torrent health, skip") + return # get the torrents that should be checked + url = tracker.url with db_session: dynamic_interval = TORRENT_CHECK_RETRY_INTERVAL * (2 ** tracker.failures) - # FIXME: this is a really dumb fix for update_tracker_info not being called in some cases - if tracker.failures >= MAX_TRACKER_FAILURES: - self.update_tracker_info(tracker.url, False) - return False torrents = select(ts for ts in tracker.torrents if ts.last_check + dynamic_interval < int(time.time())) infohashes = [t.infohash for t in torrents[:MAX_TORRENTS_CHECKED_PER_SESSION]] if len(infohashes) == 0: # We have no torrent to recheck for this tracker. Still update the last_check for this tracker. - self._logger.info("No torrent to check for tracker %s", tracker.url) - self.update_tracker_info(tracker.url, True) - return False + self._logger.info(f"No torrent to check for tracker {url}") + self.tracker_manager.update_tracker_info(url) + return try: - session = self._create_session_for_request(tracker.url, timeout=30) - if session is None: - return False + session = self._create_session_for_request(url, timeout=30) except MalformedTrackerURLException as e: + session = None # Remove the tracker from the database - self.remove_tracker(tracker.url) - self._logger.error(e) - return False + self.tracker_manager.remove_tracker(url) + self._logger.warning(e) + if session is None: + self._logger.warning('A session cannot be created. The torrent check procedure has been cancelled.') + return # We shuffle the list so that different infohashes are checked on subsequent scrape requests if the total # number of infohashes exceeds the maximum number of infohashes we check. random.shuffle(infohashes) for infohash in infohashes: session.add_infohash(infohash) - self._logger.info("Selected %d new torrents to check on tracker: %s", len(infohashes), tracker.url) + self._logger.info(f"Selected {len(infohashes)} new torrents to check on random tracker: {url}") try: - await self.connect_to_tracker(session) - return True - except: - return False + response = await self.get_tracker_response(session) + except Exception as e: # pylint: disable=broad-except + self._logger.warning(e) + else: + health_list = response.torrent_health_list + self._logger.info(f"Received {len(health_list)} health info results from tracker: {health_list}") + for health in aggregate_health_by_infohash(health_list): + self.update_torrent_health(health) - async def connect_to_tracker(self, session): + async def get_tracker_response(self, session: TrackerSession) -> TrackerResponse: try: - info_dict = await session.connect_to_tracker() - return await self._on_result_from_session(session, info_dict) + return await session.connect_to_tracker() except CancelledError: - self._logger.info("Tracker session is being cancelled (url %s)", session.tracker_url) - await self.clean_session(session) + self._logger.info(f"Tracker session is being cancelled: {session.tracker_url}") + raise except Exception as e: - self._logger.warning("Got session error for URL %s: %s", session.tracker_url, str(e).replace('\n]', ']')) - await self.clean_session(session) + exception_str = str(e).replace('\n]', ']') + self._logger.warning(f"Got session error for the tracker: {session.tracker_url}\n{exception_str}") self.tracker_manager.update_tracker_info(session.tracker_url, False) - e.tracker_url = session.tracker_url raise e + finally: + await self.clean_session(session) @property - def torrents_checked(self): - if not self._torrents_checked: - self.load_torrents_checked_from_db() - return self._torrents_checked.values() + def torrents_checked(self) -> Dict[bytes, HealthInfo]: + if self._torrents_checked is None: + self._torrents_checked = self.load_torrents_checked_from_db() + lines = '\n'.join(f' {health}' for health in sorted(self._torrents_checked.values(), + key=lambda health: -health.last_check)) + self._logger.info(f'Initially loaded self-checked torrents:\n{lines}') + return self._torrents_checked @db_session - def load_torrents_checked_from_db(self): - last_fresh_time = time.time() - HEALTH_FRESHNESS_SECONDS + def load_torrents_checked_from_db(self) -> Dict[bytes, HealthInfo]: + result = {} + now = int(time.time()) + last_fresh_time = now - HEALTH_FRESHNESS_SECONDS checked_torrents = list(self.mds.TorrentState - .select(lambda g: g.has_data and g.last_check > last_fresh_time and g.self_checked) + .select(lambda g: g.has_data and g.self_checked + and between(g.last_check, last_fresh_time, now)) .order_by(lambda g: (desc(g.seeders), g.last_check)) .limit(TORRENTS_CHECKED_RETURN_SIZE)) for torrent in checked_torrents: - self._torrents_checked[torrent.infohash] = (torrent.infohash, torrent.seeders, torrent.leechers, - torrent.last_check) + result[torrent.infohash] = HealthInfo( + torrent.infohash, seeders=torrent.seeders, leechers=torrent.leechers, last_check=torrent.last_check) + return result @db_session def torrents_to_check(self): @@ -219,18 +225,16 @@ def torrents_to_check(self): selected_torrents = random.sample(selected_torrents, min(TORRENT_SELECTION_POOL_SIZE, len(selected_torrents))) return selected_torrents - @db_session - def check_local_torrents(self): + async def check_local_torrents(self) -> Tuple[List, List]: """ Perform a full health check on a few popular and old torrents in the database. """ selected_torrents = self.torrents_to_check() - - infohashes = [] - for random_torrent in selected_torrents: - self.check_torrent_health(bytes(random_torrent.infohash)) - infohashes.append(random_torrent.infohash) - return infohashes + self._logger.info(f'Check {len(selected_torrents)} local torrents') + coros = [self.check_torrent_health(t.infohash) for t in selected_torrents] + results = await gather_coros(coros) + self._logger.info(f'Results for local torrents check: {results}') + return selected_torrents, results @db_session def torrents_to_check_in_user_channel(self): @@ -247,193 +251,146 @@ def torrents_to_check_in_user_channel(self): .limit(USER_CHANNEL_TORRENT_SELECTION_POOL_SIZE)) return channel_torrents - @db_session - def check_torrents_in_user_channel(self): + async def check_torrents_in_user_channel(self) -> List[Union[HealthInfo, BaseException]]: """ Perform a full health check of torrents in user's channel """ - for channel_torrent in self.torrents_to_check_in_user_channel(): - self.check_torrent_health(channel_torrent.infohash) - - def get_valid_next_tracker_for_auto_check(self): - tracker = self.get_next_tracker_for_auto_check() - while tracker and not is_valid_url(tracker.url): - self.remove_tracker(tracker.url) - tracker = self.get_next_tracker_for_auto_check() - return tracker - - def get_next_tracker_for_auto_check(self): - return self.tracker_manager.get_next_tracker_for_auto_check() - - def remove_tracker(self, tracker_url): - self.tracker_manager.remove_tracker(tracker_url) - - def update_tracker_info(self, tracker_url, is_successful): - self.tracker_manager.update_tracker_info(tracker_url, is_successful) + selected_torrents = self.torrents_to_check_in_user_channel() + self._logger.info(f'Check {len(selected_torrents)} torrents in user channel') + coros = [self.check_torrent_health(t.infohash) for t in selected_torrents] + results = await gather_coros(coros) + self._logger.info(f'Results for torrents in user channel: {results}') + return results + + def get_next_tracker(self): + while tracker := self.tracker_manager.get_next_tracker(): + url = tracker.url + + if not is_valid_url(url): + self.tracker_manager.remove_tracker(url) + elif tracker.failures >= MAX_TRACKER_FAILURES: + self.tracker_manager.update_tracker_info(url, is_successful=False) + else: + return tracker + + return None def is_blacklisted_tracker(self, tracker_url): return tracker_url in self.tracker_manager.blacklist @db_session - def get_valid_trackers_of_torrent(self, torrent_id): + def get_valid_trackers_of_torrent(self, infohash): """ Get a set of valid trackers for torrent. Also remove any invalid torrent.""" - db_tracker_list = self.mds.TorrentState.get(infohash=torrent_id).trackers + db_tracker_list = self.mds.TorrentState.get(infohash=infohash).trackers return {tracker.url for tracker in db_tracker_list if is_valid_url(tracker.url) and not self.is_blacklisted_tracker(tracker.url)} - def update_torrents_checked(self, new_result): - """ - Update the set with torrents that we have checked ourselves. - """ - infohash = new_result['infohash'] - seeders = new_result['seeders'] - new_result_tuple = (infohash, seeders, new_result['leechers'], new_result['last_check']) - - if seeders > 0: - self._torrents_checked[infohash] = new_result_tuple - - def on_torrent_health_check_completed(self, infohash, result): - final_response = {} - if not result or not isinstance(result, list): - self._logger.info("Received invalid torrent checker result") - self.notifier[notifications.channel_entity_updated]({"infohash": hexlify(infohash), - "num_seeders": 0, - "num_leechers": 0, - "last_tracker_check": int(time.time()), - "health": "updated"}) - return final_response - - torrent_update_dict = {'infohash': infohash, 'seeders': 0, 'leechers': 0, 'last_check': int(time.time())} - for response in reversed(result): - if isinstance(response, Exception): - final_response[response.tracker_url] = {'error': str(response)} - continue - elif response is None: - self._logger.warning("Torrent health response is none!") - continue - response_keys = list(response.keys()) - final_response[response_keys[0]] = response[response_keys[0]][0] - - s = response[response_keys[0]][0]['seeders'] - l = response[response_keys[0]][0]['leechers'] - - # More leeches is better, because undefined peers are marked as leeches in DHT - if s > torrent_update_dict['seeders'] or \ - (s == torrent_update_dict['seeders'] and l > torrent_update_dict['leechers']): - torrent_update_dict['seeders'] = s - torrent_update_dict['leechers'] = l - - self._update_torrent_result(torrent_update_dict) - self.update_torrents_checked(torrent_update_dict) - - # TODO: DRY! Stop doing lots of formats, just make REST endpoint automatically encode binary data to hex! - self.notifier[notifications.channel_entity_updated]({"infohash": hexlify(infohash), - "num_seeders": torrent_update_dict["seeders"], - "num_leechers": torrent_update_dict["leechers"], - "last_tracker_check": torrent_update_dict["last_check"], - "health": "updated"}) - return final_response - - @task - async def check_torrent_health(self, infohash, timeout=20, scrape_now=False): + async def check_torrent_health(self, infohash: bytes, timeout=20, scrape_now=False) -> HealthInfo: """ Check the health of a torrent with a given infohash. :param infohash: Torrent infohash. :param timeout: The timeout to use in the performed requests :param scrape_now: Flag whether we want to force scraping immediately """ + infohash_hex = hexlify(infohash) + self._logger.info(f'Check health for the torrent: {infohash_hex}') tracker_set = [] # We first check whether the torrent is already in the database and checked before with db_session: - result = self.mds.TorrentState.get(infohash=infohash) - if result: - torrent_id = result.infohash - last_check = result.last_check + torrent_state = self.mds.TorrentState.get(infohash=infohash) + if torrent_state: + last_check = torrent_state.last_check time_diff = time.time() - last_check if time_diff < MIN_TORRENT_CHECK_INTERVAL and not scrape_now: - self._logger.debug("time interval too short, not doing torrent health check for %s", - hexlify(infohash)) - return { - "db": { - "seeders": result.seeders, - "leechers": result.leechers, - "infohash": hexlify(infohash) - } - } + self._logger.info(f"Time interval too short, not doing torrent health check for {infohash_hex}") + return torrent_state.to_health() # get torrent's tracker list from DB - tracker_set = self.get_valid_trackers_of_torrent(torrent_id) + tracker_set = self.get_valid_trackers_of_torrent(torrent_state.infohash) + self._logger.info(f'Trackers for {infohash_hex}: {tracker_set}') - tasks = [] + coros = [] for tracker_url in tracker_set: - session = self._create_session_for_request(tracker_url, timeout=timeout) - if session is None: - return False - session.add_infohash(infohash) - tasks.append(self.connect_to_tracker(session)) - - if has_bep33_support(): - # Create a (fake) DHT session for the lookup if we have support for BEP33. - session = FakeBep33DHTSession(self.dlmgr, infohash, timeout) - else: - # Otherwise, fallback on the normal DHT metainfo lookups. - session = FakeDHTSession(self.dlmgr, infohash, timeout) - - self._session_list['DHT'].append(session) - tasks.append(self.connect_to_tracker(session)) - - res = await gather(*tasks, return_exceptions=True) - return self.on_torrent_health_check_completed(infohash, res) - - def _create_session_for_request(self, tracker_url, timeout=20): - hops = self.config.download_defaults.number_hops - if hops > len(self.socks_listen_ports or []): - # Proxies never started, dropping the request + if session := self._create_session_for_request(tracker_url, timeout=timeout): + session.add_infohash(infohash) + coros.append(self.get_tracker_response(session)) + + session_cls = FakeBep33DHTSession if has_bep33_support() else FakeDHTSession + session = session_cls(self.download_manager, timeout) + session.add_infohash(infohash) + self._logger.info(f'DHT session has been created for {infohash_hex}: {session}') + self._sessions[DHT].append(session) + + coros.append(self.get_tracker_response(session)) + responses = await gather_coros(coros) + + self._logger.info(f'{len(responses)} responses for {infohash_hex} have been received: {responses}') + successful_responses = filter_non_exceptions(responses) + health = aggregate_responses_for_infohash(infohash, successful_responses) + self.update_torrent_health(health) + + def _create_session_for_request(self, tracker_url, timeout=20) -> Optional[TrackerSession]: + self._logger.debug(f'Creating a session for the request: {tracker_url}') + + required_hops = self.config.download_defaults.number_hops + actual_hops = len(self.socks_listen_ports or []) + if required_hops > actual_hops: + self._logger.warning(f"Dropping the request. Required amount of hops doesn't reached. " + f'Required hops: {required_hops}. Actual hops: {actual_hops}') return None - proxy = ('127.0.0.1', self.socks_listen_ports[hops - 1]) if hops > 0 else None + proxy = ('127.0.0.1', self.socks_listen_ports[required_hops - 1]) if required_hops > 0 else None session = create_tracker_session(tracker_url, timeout, proxy, self.socket_mgr) - - if tracker_url not in self._session_list: - self._session_list[tracker_url] = [] - self._session_list[tracker_url].append(session) - - self._logger.debug("Session created for tracker %s", tracker_url) + self._logger.info(f'Tracker session has been created: {session}') + self._sessions[tracker_url].append(session) return session async def clean_session(self, session): - self.tracker_manager.update_tracker_info(session.tracker_url, not session.is_failed) + url = session.tracker_url + + self.tracker_manager.update_tracker_info(url, not session.is_failed) # Remove the session from our session list dictionary - self._session_list[session.tracker_url].remove(session) - if len(self._session_list[session.tracker_url]) == 0 and session.tracker_url != "DHT": - del self._session_list[session.tracker_url] + self._sessions[url].remove(session) + if len(self._sessions[url]) == 0 and url != DHT: + del self._sessions[url] await session.cleanup() + self._logger.debug('Session has been cleaned up') - async def _on_result_from_session(self, session, result_list): - await self.clean_session(session) - # FIXME: this should be probably handled by cancel, etc - if self._should_stop: - return + def update_torrent_health(self, health: HealthInfo) -> bool: + """ + Updates the torrent state in the database if it already exists, otherwise do nothing. + Returns True if the update was successful, False otherwise. + """ + if not health.is_valid(): + self._logger.warning(f'Invalid health info ignored: {health}') + return False - return result_list + self._logger.debug(f'Update torrent health: {health}') + with db_session: + # Update torrent state + torrent_state = self.mds.TorrentState.get_for_update(infohash=health.infohash) + if not torrent_state: + self._logger.warning(f"Unknown torrent: {hexlify(health.infohash)}") + return False - def _update_torrent_result(self, response): - infohash = response['infohash'] - seeders = response['seeders'] - leechers = response['leechers'] - last_check = response['last_check'] + if not health.should_update(torrent_state, self_checked=True): + self._logger.info("Skip health update, the health in the database is fresher") + return False - self._logger.debug("Update result %s/%s for %s", seeders, leechers, hexlify(infohash)) + torrent_state.set(seeders=health.seeders, leechers=health.leechers, last_check=health.last_check, + self_checked=True) - with db_session: - # Update torrent state - torrent = self.mds.TorrentState.get(infohash=infohash) - if not torrent: - self._logger.warning( - "Tried to update torrent health data in DB for an unknown torrent: %s", hexlify(infohash)) - return - torrent.seeders = seeders - torrent.leechers = leechers - torrent.last_check = last_check - torrent.self_checked = True + if health.seeders > 0: + self.torrents_checked[health.infohash] = health + else: + self.torrents_checked.pop(health.infohash, None) + + self.notifier[notifications.channel_entity_updated]({ + 'infohash': health.infohash_hex, + 'num_seeders': health.seeders, + 'num_leechers': health.leechers, + 'last_tracker_check': health.last_check, + 'health': 'updated' + }) + return True diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/torrentchecker_session.py b/src/tribler/core/components/torrent_checker/torrent_checker/torrentchecker_session.py index 02d3111b014..e2cd0786de3 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/torrentchecker_session.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/torrentchecker_session.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import random import socket @@ -6,19 +8,23 @@ import time from abc import ABCMeta, abstractmethod from asyncio import DatagramProtocol, Future, TimeoutError, ensure_future, get_event_loop - -from aiohttp import ClientResponseError, ClientSession, ClientTimeout +from typing import List, TYPE_CHECKING import async_timeout - +from aiohttp import ClientResponseError, ClientSession, ClientTimeout from ipv8.taskmanager import TaskManager + from tribler.core.components.socks_servers.socks5.aiohttp_connector import Socks5Connector from tribler.core.components.socks_servers.socks5.client import Socks5Client - +from tribler.core.components.torrent_checker.torrent_checker import DHT +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo, TrackerResponse +from tribler.core.components.torrent_checker.torrent_checker.utils import filter_non_exceptions, gather_coros from tribler.core.utilities.tracker_utils import add_url_params, parse_tracker_url -from tribler.core.utilities.unicode import hexlify from tribler.core.utilities.utilities import bdecode_compat +if TYPE_CHECKING: + from tribler.core.components.libtorrent.download_manager.download_manager import DownloadManager + # Although these are the actions for UDP trackers, they can still be used as # identifiers. TRACKER_ACTION_CONNECT = 0 @@ -32,20 +38,6 @@ MAX_INFOHASHES_IN_SCRAPE = 60 -def create_tracker_session(tracker_url, timeout, proxy, socket_manager): - """ - Creates a tracker session with the given tracker URL. - :param tracker_url: The given tracker URL. - :param timeout: The timeout for the session. - :return: The tracker session. - """ - tracker_type, tracker_address, announce_page = parse_tracker_url(tracker_url) - - if tracker_type == 'udp': - return UdpTrackerSession(tracker_url, tracker_address, announce_page, timeout, proxy, socket_manager) - return HttpTrackerSession(tracker_url, tracker_address, announce_page, timeout, proxy) - - class TrackerSession(TaskManager): __meta__ = ABCMeta @@ -69,7 +61,7 @@ def __init__(self, tracker_type, tracker_url, tracker_address, announce_page, ti self.is_failed = False def __str__(self): - return f"Tracker[{self.tracker_type}, {self.tracker_url}]" + return f"{self.__class__.__name__}[{self.tracker_type}, {self.tracker_url}]" async def cleanup(self): await self.shutdown_task_manager() @@ -101,7 +93,7 @@ def failed(self, msg=None): raise ValueError(result_msg) @abstractmethod - async def connect_to_tracker(self): + async def connect_to_tracker(self) -> TrackerResponse: """Does some work when a connection has been established.""" @@ -112,7 +104,7 @@ def __init__(self, tracker_url, tracker_address, announce_page, timeout, proxy): raise_for_status=True, timeout=ClientTimeout(total=self.timeout)) - async def connect_to_tracker(self): + async def connect_to_tracker(self) -> TrackerResponse: # create the HTTP GET message # Note: some trackers have strange URLs, e.g., # http://moviezone.ws/announce.php?passkey=8ae51c4b47d3e7d0774a720fa511cc2a @@ -143,12 +135,10 @@ async def connect_to_tracker(self): return self._process_scrape_response(body) - def _process_scrape_response(self, body): + def _process_scrape_response(self, body) -> TrackerResponse: """ - This function handles the response body of a HTTP tracker, - parsing the results. + This function handles the response body of an HTTP result from an HTTP tracker """ - # parse the retrieved results if body is None: self.failed(msg="no response body") @@ -156,39 +146,32 @@ def _process_scrape_response(self, body): if not response_dict: self.failed(msg="no valid response") - response_list = [] - - unprocessed_infohash_list = self.infohash_list[:] - if b'files' in response_dict and isinstance(response_dict[b'files'], dict): - for infohash in response_dict[b'files']: - complete = 0 - incomplete = 0 - if isinstance(response_dict[b'files'][infohash], dict): - complete = response_dict[b'files'][infohash].get(b'complete', 0) - incomplete = response_dict[b'files'][infohash].get(b'incomplete', 0) - - # Sow complete as seeders. "complete: number of peers with the entire file, i.e. seeders (integer)" - # - https://wiki.theory.org/BitTorrentSpecification#Tracker_.27scrape.27_Convention - seeders = complete - leechers = incomplete + health_list: List[HealthInfo] = [] + now = int(time.time()) - # Store the information in the dictionary - response_list.append({'infohash': hexlify(infohash), 'seeders': seeders, 'leechers': leechers}) + unprocessed_infohashes = set(self.infohash_list) + files = response_dict.get(b'files') + if isinstance(files, dict): + for infohash, file_info in files.items(): + seeders = leechers = 0 + if isinstance(file_info, dict): + # "complete: number of peers with the entire file, i.e. seeders (integer)" + # - https://wiki.theory.org/BitTorrentSpecification#Tracker_.27scrape.27_Convention + seeders = file_info.get(b'complete', 0) + leechers = file_info.get(b'incomplete', 0) - # remove this infohash in the infohash list of this session - if infohash in unprocessed_infohash_list: - unprocessed_infohash_list.remove(infohash) + unprocessed_infohashes.discard(infohash) + health_list.append(HealthInfo(infohash, last_check=now, seeders=seeders, leechers=leechers)) elif b'failure reason' in response_dict: self._logger.info("%s Failure as reported by tracker [%s]", self, repr(response_dict[b'failure reason'])) self.failed(msg=repr(response_dict[b'failure reason'])) # handle the infohashes with no result (seeders/leechers = 0/0) - for infohash in unprocessed_infohash_list: - response_list.append({'infohash': hexlify(infohash), 'seeders': 0, 'leechers': 0}) + health_list.extend(HealthInfo(infohash=infohash, last_check=now) for infohash in unprocessed_infohashes) self.is_finished = True - return {self.tracker_url: response_list} + return TrackerResponse(url=self.tracker_url, torrent_health_list=health_list) async def cleanup(self): """ @@ -304,7 +287,7 @@ async def cleanup(self): await super().cleanup() self.remove_transaction_id() - async def connect_to_tracker(self): + async def connect_to_tracker(self) -> TrackerResponse: """ Connects to the tracker and starts querying for seed and leech data. :return: A dictionary containing seed/leech information per infohash @@ -368,7 +351,7 @@ async def connect(self): self.generate_transaction_id() self.last_contact = int(time.time()) - async def scrape(self): + async def scrape(self) -> TrackerResponse: # pack and send the message if sys.version_info.major > 2: infohash_list = self.infohash_list @@ -405,6 +388,7 @@ async def scrape(self): offset = 8 response_list = [] + now = int(time.time()) for infohash in self.infohash_list: complete, _downloaded, incomplete = struct.unpack_from('!iii', response, offset) @@ -413,15 +397,14 @@ async def scrape(self): # Store the information in the hash dict to be returned. # Sow complete as seeders. "complete: number of peers with the entire file, i.e. seeders (integer)" # - https://wiki.theory.org/BitTorrentSpecification#Tracker_.27scrape.27_Convention - response_list.append({'infohash': hexlify(infohash), - 'seeders': complete, 'leechers': incomplete}) + response_list.append(HealthInfo(infohash, last_check=now, seeders=complete, leechers=incomplete)) # close this socket and remove its transaction ID from the list self.remove_transaction_id() self.last_contact = int(time.time()) self.is_finished = True - return {self.tracker_url: response_list} + return TrackerResponse(url=self.tracker_url, torrent_health_list=response_list) class FakeDHTSession(TrackerSession): @@ -429,43 +412,20 @@ class FakeDHTSession(TrackerSession): Fake TrackerSession that manages DHT requests """ - def __init__(self, dlmgr, infohash, timeout): - super().__init__('DHT', 'DHT', 'DHT', 'DHT', timeout) + def __init__(self, download_manager: DownloadManager, timeout: float): + super().__init__(DHT, DHT, DHT, DHT, timeout) - self.infohash = infohash - self.dlmgr = dlmgr + self.download_manager = download_manager - async def cleanup(self): - """ - Cleans the session by cancelling all deferreds and closing sockets. - :return: A deferred that fires once the cleanup is done. - """ - await super().shutdown_task_manager() - self.infohash_list = None - - def add_infohash(self, infohash): - """ - This function adds a infohash to the request list. - :param infohash: The infohash to be added. - """ - self.infohash = infohash - - async def connect_to_tracker(self): - """ - Fakely connects to a tracker. - :return: A deferred that fires with the health information. - """ - metainfo = await self.dlmgr.get_metainfo(self.infohash, timeout=self.timeout) - if not metainfo: - raise RuntimeError("Metainfo lookup error") + async def connect_to_tracker(self) -> TrackerResponse: + health_list = [] + now = int(time.time()) + for infohash in self.infohash_list: + metainfo = await self.download_manager.get_metainfo(infohash, timeout=self.timeout, raise_errors=True) + health = HealthInfo(infohash, last_check=now, seeders=metainfo[b'seeders'], leechers=metainfo[b'leechers']) + health_list.append(health) - return { - "DHT": [{ - "infohash": hexlify(self.infohash), - "seeders": metainfo[b"seeders"], - "leechers": metainfo[b"leechers"] - }] - } + return TrackerResponse(url=DHT, torrent_health_list=health_list) class FakeBep33DHTSession(FakeDHTSession): @@ -473,12 +433,22 @@ class FakeBep33DHTSession(FakeDHTSession): Fake session for a BEP33 lookup. """ - async def connect_to_tracker(self): - """ - Fakely connects to a tracker. - :return: A deferred that fires with the health information. - """ - try: - return await self.dlmgr.dht_health_manager.get_health(self.infohash, timeout=self.timeout) - except TimeoutError: - self.failed(msg='request timed out') + async def connect_to_tracker(self) -> TrackerResponse: + coros = [self.download_manager.dht_health_manager.get_health(infohash, timeout=self.timeout) + for infohash in self.infohash_list] + results = await gather_coros(coros) + return TrackerResponse(url=DHT, torrent_health_list=filter_non_exceptions(results)) + + +def create_tracker_session(tracker_url, timeout, proxy, socket_manager) -> TrackerSession: + """ + Creates a tracker session with the given tracker URL. + :param tracker_url: The given tracker URL. + :param timeout: The timeout for the session. + :return: The tracker session. + """ + tracker_type, tracker_address, announce_page = parse_tracker_url(tracker_url) + + if tracker_type == 'udp': + return UdpTrackerSession(tracker_url, tracker_address, announce_page, timeout, proxy, socket_manager) + return HttpTrackerSession(tracker_url, tracker_address, announce_page, timeout, proxy) diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/tracker_manager.py b/src/tribler/core/components/torrent_checker/torrent_checker/tracker_manager.py index abebafed913..8bfaec0a085 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/tracker_manager.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/tracker_manager.py @@ -8,7 +8,7 @@ from tribler.core.utilities.tracker_utils import get_uniformed_tracker_url MAX_TRACKER_FAILURES = 5 # if a tracker fails this amount of times in a row, its 'is_alive' will be marked as 0 (dead). -TRACKER_RETRY_INTERVAL = 60 # A "dead" tracker will be retired every 60 seconds +TRACKER_RETRY_INTERVAL = 60 # A "dead" tracker will be retired every 60 seconds class TrackerManager: @@ -16,7 +16,7 @@ class TrackerManager: def __init__(self, state_dir: Path = None, metadata_store: MetadataStore = None): self._logger = logging.getLogger(self.__class__.__name__) self.state_dir = state_dir - self.tracker_store = metadata_store.TrackerState + self.TrackerState = metadata_store.TrackerState self.blacklist = [] self.load_blacklist() @@ -44,7 +44,7 @@ def get_tracker_info(self, tracker_url): sanitized_tracker_url = get_uniformed_tracker_url(tracker_url) if tracker_url != "DHT" else tracker_url with db_session: - tracker = list(self.tracker_store.select(lambda g: g.url == sanitized_tracker_url)) + tracker = list(self.TrackerState.select(lambda g: g.url == sanitized_tracker_url)) if tracker: return { 'id': tracker[0].url, @@ -65,17 +65,17 @@ def add_tracker(self, tracker_url): return with db_session: - num = count(g for g in self.tracker_store if g.url == sanitized_tracker_url) + num = count(g for g in self.TrackerState if g.url == sanitized_tracker_url) if num > 0: self._logger.debug("skip existing tracker: %s", repr(tracker_url)) return # insert into database - self.tracker_store(url=sanitized_tracker_url, - last_check=0, - failures=0, - alive=True, - torrents={}) + self.TrackerState(url=sanitized_tracker_url, + last_check=0, + failures=0, + alive=True, + torrents={}) def remove_tracker(self, tracker_url): """ @@ -87,12 +87,12 @@ def remove_tracker(self, tracker_url): sanitized_tracker_url = get_uniformed_tracker_url(tracker_url) with db_session: - options = self.tracker_store.select(lambda g: g.url in [tracker_url, sanitized_tracker_url]) + options = self.TrackerState.select(lambda g: g.url in [tracker_url, sanitized_tracker_url]) for option in options[:]: option.delete() @db_session - def update_tracker_info(self, tracker_url, is_successful): + def update_tracker_info(self, tracker_url, is_successful=True): """ Updates a tracker information. :param tracker_url: The given tracker_url. @@ -103,7 +103,7 @@ def update_tracker_info(self, tracker_url, is_successful): return sanitized_tracker_url = get_uniformed_tracker_url(tracker_url) - tracker = self.tracker_store.get(lambda g: g.url == sanitized_tracker_url) + tracker = self.TrackerState.get(lambda g: g.url == sanitized_tracker_url) if not tracker: self._logger.error("Trying to update the tracker info of an unknown tracker URL") @@ -117,19 +117,20 @@ def update_tracker_info(self, tracker_url, is_successful): tracker.last_check = current_time tracker.failures = failures tracker.alive = is_alive + self._logger.info(f'Tracker updated: {tracker.url}. Alive: {is_alive}. Failures: {failures}.') @db_session - def get_next_tracker_for_auto_check(self): + def get_next_tracker(self): """ - Gets the next tracker for automatic tracker-checking. - :return: The next tracker for automatic tracker-checking. + Gets the next tracker. + :return: The next tracker for torrent-checking. """ - tracker = self.tracker_store.select(lambda g: str(g.url) - and g.alive - and g.last_check + TRACKER_RETRY_INTERVAL <= int(time.time()) - and str(g.url) not in self.blacklist)\ - .order_by(self.tracker_store.last_check).limit(1) - + tracker = self.TrackerState.select( + lambda g: str(g.url) + and g.alive + and g.last_check + TRACKER_RETRY_INTERVAL <= int(time.time()) + and str(g.url) not in self.blacklist + ).order_by(self.TrackerState.last_check).limit(1) if not tracker: return None return tracker[0] diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/utils.py b/src/tribler/core/components/torrent_checker/torrent_checker/utils.py new file mode 100644 index 00000000000..6afde5bf860 --- /dev/null +++ b/src/tribler/core/components/torrent_checker/torrent_checker/utils.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from asyncio import gather +from typing import Awaitable, Dict, Iterable, List, TypeVar, Union, cast + +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo, TrackerResponse + + +T = TypeVar("T") + + +async def gather_coros(coros: Iterable[Awaitable[T]]) -> List[Union[T, BaseException]]: + """ + A replacement of asyncio.gather() with a proper typing support for coroutines with the same result type + """ + results = await gather(*coros, return_exceptions=True) + return cast(List[Union[T, BaseException]], results) + + +def filter_non_exceptions(items: List[Union[T, BaseException]]) -> List[T]: + """ + Removes exceptions from the result of the `await gather_coro(...)` call + """ + return [item for item in items if not isinstance(item, BaseException)] + + +def aggregate_responses_for_infohash(infohash: bytes, responses: List[TrackerResponse]) -> HealthInfo: + """ + Finds the "best" health info (with the max number of seeders) for a specified infohash + """ + result = HealthInfo(infohash, last_check=0) + for response in responses: + for health in response.torrent_health_list: + if health.infohash == infohash and health.seeders > result.seeders: + result = health + return result + + +def aggregate_health_by_infohash(health_list: List[HealthInfo]) -> List[HealthInfo]: + """ + For each infohash in the health list, finds the "best" health info (with the max number of seeders) + """ + d: Dict[bytes, HealthInfo] = {} + for health in health_list: + infohash = health.infohash + if infohash not in d or health.seeders > d[infohash].seeders: + d[infohash] = health + return list(d.values()) diff --git a/src/tribler/core/components/tunnel/community/tunnel_community.py b/src/tribler/core/components/tunnel/community/tunnel_community.py index 16d350923fc..97bf82e6c5d 100644 --- a/src/tribler/core/components/tunnel/community/tunnel_community.py +++ b/src/tribler/core/components/tunnel/community/tunnel_community.py @@ -76,7 +76,7 @@ def __init__(self, *args, **kwargs): self.exitnode_cache: Optional[Path] = kwargs.pop('exitnode_cache', None) self.config = kwargs.pop('config', None) self.notifier = kwargs.pop('notifier', None) - self.dlmgr = kwargs.pop('dlmgr', None) + self.download_manager = kwargs.pop('dlmgr', None) self.socks_servers: List[Socks5Server] = kwargs.pop('socks_servers', []) num_competing_slots = self.config.competing_slots num_random_slots = self.config.random_slots @@ -115,7 +115,7 @@ def __init__(self, *args, **kwargs): if self.exitnode_cache is not None: self.restore_exitnodes_from_disk() - if self.dlmgr is not None: + if self.download_manager is not None: downloads_polling_interval = 1.0 self.register_task('Poll download manager for new or changed downloads', self._poll_download_manager, @@ -124,7 +124,7 @@ def __init__(self, *args, **kwargs): async def _poll_download_manager(self): # This must run in all circumstances, so catch all exceptions try: - dl_states = self.dlmgr.get_last_download_states() + dl_states = self.download_manager.get_last_download_states() self.monitor_downloads(dl_states) except Exception as e: # pylint: disable=broad-except self.logger.error("Error on polling Download Manager: %s", e) @@ -416,8 +416,8 @@ def remove_circuit(self, circuit_id, additional_info='', remove_now=False, destr # Make sure the circuit is marked as closing, otherwise we may end up reusing it circuit.close() - if self.dlmgr: - for download in self.dlmgr.get_downloads(): + if self.download_manager: + for download in self.download_manager.get_downloads(): self.update_torrent(affected_peers, download) # Now we actually remove the circuit @@ -534,21 +534,21 @@ def on_rendezvous_established(self, source_address, data, circuit_id): super().on_rendezvous_established(source_address, data, circuit_id) circuit = self.circuits.get(circuit_id) - if circuit and self.dlmgr: + if circuit and self.download_manager: self.update_ip_filter(circuit.info_hash) def update_ip_filter(self, info_hash): download = self.get_download(info_hash) - lt_session = self.dlmgr.get_session(download.config.get_hops()) + lt_session = self.download_manager.get_session(download.config.get_hops()) ip_addresses = [self.circuit_id_to_ip(c.circuit_id) for c in self.find_circuits(ctype=CIRCUIT_TYPE_RP_SEEDER)] + ['1.1.1.1'] - self.dlmgr.update_ip_filter(lt_session, ip_addresses) + self.download_manager.update_ip_filter(lt_session, ip_addresses) def get_download(self, lookup_info_hash): - if not self.dlmgr: + if not self.download_manager: return None - for download in self.dlmgr.get_downloads(): + for download in self.download_manager.get_downloads(): if lookup_info_hash == self.get_lookup_info_hash(download.get_def().get_infohash()): return download @@ -564,12 +564,12 @@ async def create_introduction_point(self, info_hash, required_ip=None): # the connection and the libtorrent listen port. # Starting from libtorrent 1.2.4 on Windows, listen_port() returns 0 if used in combination with a # SOCKS5 proxy. Therefore on Windows, we resort to using ports received through listen_succeeded_alert. - if LooseVersion(self.dlmgr.get_libtorrent_version()) < LooseVersion("1.2.0"): + if LooseVersion(self.download_manager.get_libtorrent_version()) < LooseVersion("1.2.0"): download.add_peer(('1.1.1.1', 1024)) else: hops = download.config.get_hops() - lt_listen_port = self.dlmgr.listen_ports.get(hops) - lt_listen_port = lt_listen_port or self.dlmgr.get_session(hops).listen_port() + lt_listen_port = self.download_manager.listen_ports.get(hops) + lt_listen_port = lt_listen_port or self.download_manager.get_session(hops).listen_port() for session in self.socks_servers[hops - 1].sessions: if session.udp_connection and lt_listen_port: session.udp_connection.remote_udp_address = ("127.0.0.1", lt_listen_port) diff --git a/src/tribler/core/components/tunnel/tests/test_full_session/test_tunnel_community.py b/src/tribler/core/components/tunnel/tests/test_full_session/test_tunnel_community.py index 807fc9cc8b4..3909f206ce5 100644 --- a/src/tribler/core/components/tunnel/tests/test_full_session/test_tunnel_community.py +++ b/src/tribler/core/components/tunnel/tests/test_full_session/test_tunnel_community.py @@ -63,8 +63,8 @@ async def proxy_factory(tmp_path_factory: TempPathFactory): yield factory for community in factory.communities: - if community.dlmgr: - await community.dlmgr.shutdown() + if community.download_manager: + await community.download_manager.shutdown() await community.unload() test_community.global_dht_services = defaultdict(list) # Reset the global_dht_services variable @@ -78,7 +78,7 @@ async def hidden_seeder_comm(proxy_factory: ProxyFactory, video_tdef: TorrentDef download_config = DownloadConfig() download_config.set_dest_dir(TESTS_DATA_DIR) download_config.set_hops(1) - upload = community.dlmgr.start_download(tdef=video_tdef, config=download_config) + upload = community.download_manager.start_download(tdef=video_tdef, config=download_config) def seeder_state_callback(download_state): """ @@ -160,7 +160,7 @@ def start_anon_download(tunnel_community: TriblerTunnelCommunity, """ Start an anonymous download in the main Tribler session. """ - download_manager = tunnel_community.dlmgr + download_manager = tunnel_community.download_manager config = DownloadConfig() config.set_dest_dir(download_manager.state_dir) config.set_hops(hops) @@ -219,7 +219,7 @@ async def tunnel_community(tmp_path_factory: TempPathFactory): yield community - await community.dlmgr.shutdown() + await community.download_manager.shutdown() await community.unload() @@ -231,7 +231,7 @@ async def test_anon_download(proxy_factory: ProxyFactory, video_seeder: Download """ relays, exit_nodes = await create_nodes(proxy_factory) await introduce_peers([tunnel_community] + relays + exit_nodes) - download_manager = tunnel_community.dlmgr + download_manager = tunnel_community.download_manager download = start_anon_download(tunnel_community, video_seeder.libtorrent_port, video_tdef) await download.wait_for_status(DLSTATUS_DOWNLOADING) @@ -282,6 +282,7 @@ def __getitem__(self, key): for e in exit_nodes: e.exit_sockets = MockExitDict(e.exit_sockets) - download = start_anon_download(leecher_community, hidden_seeder_comm.dlmgr.libtorrent_port, video_tdef, hops=1) + download = start_anon_download(leecher_community, hidden_seeder_comm.download_manager.libtorrent_port, video_tdef, + hops=1) download.set_state_callback(download_state_callback) await download_finished.wait() diff --git a/src/tribler/core/components/tunnel/tests/test_triblertunnel_community.py b/src/tribler/core/components/tunnel/tests/test_triblertunnel_community.py index f1ea203dc31..df94c591f9f 100644 --- a/src/tribler/core/components/tunnel/tests/test_triblertunnel_community.py +++ b/src/tribler/core/components/tunnel/tests/test_triblertunnel_community.py @@ -286,19 +286,19 @@ def test_update_ip_filter(self): self.nodes[0].overlay.get_download = lambda _: download lt_session = Mock() - self.nodes[0].overlay.dlmgr = Mock() - self.nodes[0].overlay.dlmgr.get_session = lambda _: lt_session - self.nodes[0].overlay.dlmgr.update_ip_filter = Mock() - self.nodes[0].overlay.dlmgr.get_downloads = lambda: [download] + self.nodes[0].overlay.download_manager = Mock() + self.nodes[0].overlay.download_manager.get_session = lambda _: lt_session + self.nodes[0].overlay.download_manager.update_ip_filter = Mock() + self.nodes[0].overlay.download_manager.get_downloads = lambda: [download] self.nodes[0].overlay.update_ip_filter(0) ips = ['1.1.1.1'] - self.nodes[0].overlay.dlmgr.update_ip_filter.assert_called_with(lt_session, ips) + self.nodes[0].overlay.download_manager.update_ip_filter.assert_called_with(lt_session, ips) circuit.ctype = CIRCUIT_TYPE_RP_SEEDER self.nodes[0].overlay.update_ip_filter(0) ips = [self.nodes[0].overlay.circuit_id_to_ip(circuit.circuit_id), '1.1.1.1'] - self.nodes[0].overlay.dlmgr.update_ip_filter.assert_called_with(lt_session, ips) + self.nodes[0].overlay.download_manager.update_ip_filter.assert_called_with(lt_session, ips) def test_update_torrent(self): """ diff --git a/src/tribler/core/logger/logger.py b/src/tribler/core/logger/logger.py index bd1c1e1091f..57575b4a6ca 100644 --- a/src/tribler/core/logger/logger.py +++ b/src/tribler/core/logger/logger.py @@ -16,6 +16,9 @@ def filter(self, record): return record.levelno < logging.ERROR +log_factory = logging.getLogRecordFactory() + + def load_logger_config(app_mode, log_dir, current_process_is_primary=True): """ Loads tribler-gui module logger configuration. Note that this function should be called explicitly to @@ -45,6 +48,11 @@ def setup_logging(app_mode, log_dir: Path, config_path: Path): """ Setup logging configuration with the given YAML file. """ + def record_factory(*args, **kwargs): + record = log_factory(*args, **kwargs) + record.app_mode = app_mode + return record + logger.info(f'Load logger config: app_mode={app_mode}, config_path={config_path}, dir={log_dir}') if not config_path.exists(): print(f'Logger config not found in {config_path}. Using default configs.', file=sys.stderr) @@ -65,9 +73,9 @@ def setup_logging(app_mode, log_dir: Path, config_path: Path): # Create log directory if it does not exist if not log_dir.exists(): log_dir.mkdir(parents=True) - config = yaml.safe_load(config_text) logging.config.dictConfig(config) + logging.setLogRecordFactory(record_factory) logger.info(f'Config loaded for app_mode={app_mode}') except Exception as e: # pylint: disable=broad-except error_description = format_error_description(e) diff --git a/src/tribler/core/logger/logger.yaml b/src/tribler/core/logger/logger.yaml index 3b3b1fe0a56..e56852eaf94 100644 --- a/src/tribler/core/logger/logger.yaml +++ b/src/tribler/core/logger/logger.yaml @@ -7,9 +7,9 @@ filters: # Logging formatter formatters: standard: - format: "[PID:%(process)d] %(asctime)s - %(levelname)s - %(name)s(%(lineno)d) - %(message)s" + format: "[%(app_mode)s PID:%(process)d] %(asctime)s - %(levelname)s - %(name)s(%(lineno)d) - %(message)s" error: - format: "[PID:%(process)d] %(asctime)s - %(levelname)s <%(module)s:%(lineno)d> %(name)s.%(funcName)s(): %(message)s" + format: "[%(app_mode)s PID:%(process)d] %(asctime)s - %(levelname)s <%(module)s:%(lineno)d> %(name)s.%(funcName)s(): %(message)s" # Logging handlers handlers: diff --git a/src/tribler/core/utilities/dependencies.py b/src/tribler/core/utilities/dependencies.py index 8378a58b0c5..af6173f0f20 100644 --- a/src/tribler/core/utilities/dependencies.py +++ b/src/tribler/core/utilities/dependencies.py @@ -5,44 +5,60 @@ """ import logging import re -import tribler from enum import Enum +from pathlib import Path from typing import Iterator, Optional -from tribler.core.utilities.path_util import Path + +import tribler + # fmt: off logger = logging.getLogger(__name__) -Scope = Enum('Scope', 'core gui') + +class Scope(Enum): + core = 'core' + gui = 'gui' + # Exceptional pip packages where the name does not match with actual import. package_to_import_mapping = { 'Faker': 'faker', - 'sentry-sdk': 'sentry_sdk' } -# pylint: disable=import-outside-toplevel def get_dependencies(scope: Scope) -> Iterator[str]: - def _get_path_to_requirements_txt() -> Optional[Path]: - root_path = Path(tribler.__file__).parent.parent.parent - if scope == Scope.core: - return root_path / 'requirements-core.txt' - if scope == Scope.gui: - return root_path / 'requirements.txt' - raise AttributeError(f'Scope is {scope} but should be in {[s for s in Scope]}') # pylint: disable=unnecessary-comprehension + requirements_path = _get_path_to_requirements_txt(scope) + return _get_pip_dependencies(requirements_path) + - return _get_pip_dependencies(_get_path_to_requirements_txt()) +def _get_path_to_requirements_txt(scope: Scope) -> Path: + root_path = Path(tribler.__file__).parent.parent.parent + if scope == Scope.core: + return root_path / 'requirements-core.txt' + if scope == Scope.gui: + return root_path / 'requirements.txt' + raise AttributeError(f'Scope is {scope} but should be in {list(Scope)}') + + +def _get_pip_dependencies(path_to_requirements: Path) -> Iterator[str]: + logger.info(f'Getting dependencies from: {path_to_requirements}') + requirements_text = path_to_requirements.read_text() + return _extract_libraries_from_requirements(requirements_text) def _extract_libraries_from_requirements(text: str) -> Iterator[str]: logger.debug(f'requirements.txt content: {text}') - for library in filter(None, text.split('\n')): - pip_package = re.split(r'[><=~]', library, maxsplit=1)[0] - yield package_to_import_mapping.get(pip_package, pip_package) + for line in text.split('\n'): + package = _extract_package_from_line(line) + if package: + yield package -def _get_pip_dependencies(path_to_requirements: Path) -> Iterator[str]: - logger.info(f'Getting dependencies from: {path_to_requirements}') - return _extract_libraries_from_requirements(path_to_requirements.read_text()) +def _extract_package_from_line(line) -> Optional[str]: + without_comment = line.partition('#')[0].strip() + without_version = re.split(r'[><=~]', without_comment, maxsplit=1)[0] + with_underscores = without_version.replace('-', '_') + package = package_to_import_mapping.get(with_underscores, with_underscores) + return package or None diff --git a/src/tribler/core/utilities/tests/test_dependencies.py b/src/tribler/core/utilities/tests/test_dependencies.py index 3419962f446..27e0cca49be 100644 --- a/src/tribler/core/utilities/tests/test_dependencies.py +++ b/src/tribler/core/utilities/tests/test_dependencies.py @@ -12,18 +12,17 @@ # pylint: disable=protected-access -# fmt: off - def test_extract_libraries_from_requirements(): # check that libraries extracts from text correctly - text = 'PyQt5>=5.14\n' \ - 'psutil\n' \ - '\n' \ - 'configobj\n' - + text = ( + 'PyQt5>=5.14\n' + 'psutil # some comment\n' + '\n' + '# comment line\n' + 'configobj\n' + ) assert list(_extract_libraries_from_requirements(text)) == ['PyQt5', 'psutil', 'configobj'] - def test_pip_dependencies_gen(): # check that libraries extracts from file correctly path = Path(tribler.__file__).parent.parent.parent / 'requirements.txt' diff --git a/src/tribler/core/utilities/unicode.py b/src/tribler/core/utilities/unicode.py index c225007c9ee..75949cd2d6a 100644 --- a/src/tribler/core/utilities/unicode.py +++ b/src/tribler/core/utilities/unicode.py @@ -79,5 +79,5 @@ def recursive_bytes(obj): return obj -def hexlify(binary): +def hexlify(binary: bytes) -> str: return binascii.hexlify(binary).decode('utf-8') diff --git a/src/tribler/gui/widgets/tablecontentmodel.py b/src/tribler/gui/widgets/tablecontentmodel.py index 92958216813..3870db1d9fb 100644 --- a/src/tribler/gui/widgets/tablecontentmodel.py +++ b/src/tribler/gui/widgets/tablecontentmodel.py @@ -4,6 +4,7 @@ import uuid from collections import deque from dataclasses import dataclass, field +from datetime import timedelta from enum import Enum, auto from typing import Callable, Dict, List @@ -255,11 +256,13 @@ def add_items(self, new_items, on_top=False, remote=False): new_data_items = non_torrents + torrents new_item_uid_map = {} + insert_index = 0 for item in new_data_items: item_uid = get_item_uid(item) new_item_uid_map[item_uid] = insert_index if 'infohash' in item: new_item_uid_map[item['infohash']] = insert_index + insert_index += 1 self.beginResetModel() self.data_items = new_data_items self.item_uid_map = new_item_uid_map @@ -515,6 +518,22 @@ def item_txt(self, index, role, is_editing: bool = False): ) return tooltip_txt + if role == Qt.ToolTipRole and column_type == Column.HEALTH: + last_tracker_check = item.get('last_tracker_check') + if item.get('health') == HEALTH_CHECKING: + return 'Checking...' + if last_tracker_check is None: + return 'Unknown' + if last_tracker_check == 0: + return 'Not checked' + + td = timedelta(seconds=time.time() - last_tracker_check) + if td.days > 0: + return f'Checked: {td.days} days ago' + + time_without_microseconds = str(td).partition('.')[0] + return f'Checked: {time_without_microseconds} ago' + # The 'name' column is special in a sense that we want to draw the title and tags ourselves. # At the same time, we want to name this column to not break the renaming of torrent files, hence this check. if column_type == Column.NAME and not is_editing: @@ -546,15 +565,16 @@ def update_node_info(self, update_dict): it is time to update the labels. """ - if ( - self.channel_info.get("public_key") == update_dict.get("public_key") is not None - and self.channel_info.get("id") == update_dict.get("id") is not None - ): + MISSING = object() # to avoid false positive comparison with None + public_key_is_equal = self.channel_info.get("public_key", None) == update_dict.get("public_key", MISSING) + id_is_equal = self.channel_info.get("id", None) == update_dict.get("id", MISSING) + if public_key_is_equal and id_is_equal: self.channel_info.update(**update_dict) self.info_changed.emit([]) return - row = self.item_uid_map.get(get_item_uid(update_dict)) + uid = get_item_uid(update_dict) + row = self.item_uid_map.get(uid) if row is not None and row < len(self.data_items): self.data_items[row].update(**update_dict) self.dataChanged.emit(self.index(row, 0), self.index(row, len(self.columns)), []) diff --git a/src/tribler/gui/widgets/triblertablecontrollers.py b/src/tribler/gui/widgets/triblertablecontrollers.py index 65858971e9a..238b770a7c2 100644 --- a/src/tribler/gui/widgets/triblertablecontrollers.py +++ b/src/tribler/gui/widgets/triblertablecontrollers.py @@ -3,6 +3,7 @@ The responsibility of the controller is to populate the table view with some data, contained in a specific model. """ import json +import logging import time from PyQt5.QtCore import QObject, QTimer, Qt @@ -151,6 +152,7 @@ def _on_selection_changed(self, selected, deselected): # pylint: disable=W0613 class HealthCheckerMixin: def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.health_checker_logger = logging.getLogger('HealthCheckerMixin') connect( self.table_view.delegate.health_status_widget.clicked, @@ -159,66 +161,27 @@ def __init__(self, *args, **kwargs): connect(self.table_view.torrent_clicked, self.check_torrent_health) def check_torrent_health(self, data_item, forced=False): - # ACHTUNG: The health check can be triggered multiple times for a single infohash - # by e.g. selection and click signals if not dict_item_is_any_of(data_item, 'type', [REGULAR_TORRENT]): return - - infohash = data_item['infohash'] - if Column.HEALTH not in self.model.column_position: return # Check if the entry still exists in the table + infohash = data_item['infohash'] row = self.model.item_uid_map.get(infohash) - items = self.model.data_items - if row is None or row >= len(items): + if row is None: return - data_item = items[row] if not forced and data_item.get('health', HEALTH_UNCHECKED) != HEALTH_UNCHECKED: return data_item['health'] = HEALTH_CHECKING health_cell_index = self.model.index(row, self.model.column_position[Column.HEALTH]) self.model.dataChanged.emit(health_cell_index, health_cell_index, []) - request_manager.get(f"metadata/torrents/{infohash}/health", self.on_health_response, - url_params={"nowait": True, "refresh": True}, - capture_errors=False, - priority=QNetworkRequest.LowPriority) - - def on_health_response(self, response): - total_seeders = 0 - total_leechers = 0 - - if not response or 'error' in response or 'checking' in response: - return - - infohash = response['infohash'] - for _, status in response['health'].items(): - if 'error' in status: - continue # Timeout or invalid status - total_seeders += int(status['seeders']) - total_leechers += int(status['leechers']) - - self.update_torrent_health(infohash, total_seeders, total_leechers) - - def update_torrent_health(self, infohash, seeders, leechers): - # Check if details widget is still showing the same entry and the entry still exists in the table - row = self.model.item_uid_map.get(infohash) - if row is None: - return - - data_item = self.model.data_items[row] - data_item['num_seeders'] = seeders - data_item['num_leechers'] = leechers - data_item['last_tracker_check'] = time.time() - data_item['health'] = get_health( - data_item['num_seeders'], data_item['num_leechers'], data_item['last_tracker_check'] + request_manager.get( + f"metadata/torrents/{infohash}/health", + capture_errors=False, + priority=QNetworkRequest.LowPriority ) - if Column.HEALTH in self.model.column_position: - index = self.model.index(row, self.model.column_position[Column.HEALTH]) - self.model.dataChanged.emit(index, index, []) - class ContextMenuMixin: def __init__(self, *args, **kwargs):