diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index f5107b9e99..658b0eeb68 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -195,17 +195,17 @@ async def probe_files(self): while True: for sd_hash in self.sd_hashes.read_samples(10_000): self.refresh_reachable_set() - log.info("Querying stream %s for peers.", sd_hash.hex()[:8]) distance = Distance(sd_hash) node_ids = list(self._reachable_by_node_id.keys()) node_ids.sort(key=lambda node_id: distance(node_id)) k_closest = [self._reachable_by_node_id[node_id] for node_id in node_ids[:8]] + found = False + working = False for response in asyncio.as_completed( [self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]): response = await response - self.probed_streams_metric.labels("global").inc() if response and response.found: - self.announced_streams_metric.labels("global").inc() + found = True blob_peers = [] for compact_addr in response.found_compact_addresses: try: @@ -215,7 +215,7 @@ async def probe_files(self): for blob_peer in blob_peers: response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash) if response: - self.working_streams_metric.labels("global").inc() + working = True log.info("Found responsive peer for %s: %s:%d(%d)", sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port or -1, blob_peer.tcp_port or -1) @@ -223,6 +223,12 @@ async def probe_files(self): log.info("Found dead peer for %s: %s:%d(%d)", sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port or -1, blob_peer.tcp_port or -1) + self.probed_streams_metric.labels("global").inc() + if found: + self.announced_streams_metric.labels("global").inc() + if working: + self.working_streams_metric.labels("global").inc() + log.info("Done querying stream %s for peers. Found: %s, working: %s", sd_hash.hex()[:8], found, working) await asyncio.sleep(.5) @property @@ -358,7 +364,7 @@ async def crawl_routing_table(self, host, port, node_id=None): max_distance = int.from_bytes(bytes([0xff] * 48), 'big') peers = set() factor = 2048 - for i in range(200): + for i in range(1000): response = await self.request_peers(address, port, key) new_peers = list(response.get_close_kademlia_peers(peer)) if response else None if not new_peers: @@ -479,6 +485,7 @@ async def test(): for (host, port) in conf.known_dht_nodes: probes.append(asyncio.create_task(crawler.crawl_routing_table(host, port))) await asyncio.gather(*probes) + await crawler.flush_to_db() probe_task = asyncio.ensure_future(crawler.probe_files()) await crawler.process()