Skip to content

Commit

Permalink
feat(logger): config.ini, proxy, network stats (#547)
Browse files Browse the repository at this point in the history
* feat(logger): config.ini, proxy, network stats

* feat(logger): network analysis only for debug flag

* feat(logger): switch to time.perf_counter

* Handle psutil.AccessDenied

* Add testing for network telemetry and configure logger

* AI Patching Suggestion
  • Loading branch information
dylanpulver authored Jul 31, 2024
1 parent c84c17e commit 83a07ec
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 4 deletions.
81 changes: 80 additions & 1 deletion safety/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import configparser
from dataclasses import asdict
from enum import Enum
import requests
import time

import json
import logging
Expand Down Expand Up @@ -46,9 +48,68 @@
except ImportError:
from typing_extensions import Annotated


LOG = logging.getLogger(__name__)

def get_network_telemetry():
import psutil
import socket
network_info = {}
try:
# Get network IO statistics
net_io = psutil.net_io_counters()
network_info['bytes_sent'] = net_io.bytes_sent
network_info['bytes_recv'] = net_io.bytes_recv
network_info['packets_sent'] = net_io.packets_sent
network_info['packets_recv'] = net_io.packets_recv

# Test network speed (download speed)
test_url = "http://example.com" # A URL to test the download speed
start_time = time.perf_counter()
try:
response = requests.get(test_url, timeout=10)
end_time = time.perf_counter()
download_time = end_time - start_time
download_speed = len(response.content) / download_time
network_info['download_speed'] = download_speed
except requests.RequestException as e:
network_info['download_speed'] = None
network_info['error'] = str(e)


# Get network addresses
net_if_addrs = psutil.net_if_addrs()
network_info['interfaces'] = {iface: [addr.address for addr in addrs if addr.family == socket.AF_INET] for iface, addrs in net_if_addrs.items()}

# Get network connections
net_connections = psutil.net_connections(kind='inet')
network_info['connections'] = [
{
'fd': conn.fd,
'family': conn.family,
'type': conn.type,
'laddr': f"{conn.laddr.ip}:{conn.laddr.port}",
'raddr': f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else None,
'status': conn.status
}
for conn in net_connections
]

# Get network interface stats
net_if_stats = psutil.net_if_stats()
network_info['interface_stats'] = {
iface: {
'isup': stats.isup,
'duplex': stats.duplex,
'speed': stats.speed,
'mtu': stats.mtu
}
for iface, stats in net_if_stats.items()
}
except psutil.AccessDenied as e:
network_info['error'] = f"Access denied when trying to gather network telemetry: {e}"

return network_info

def preprocess_args(f):
if '--debug' in sys.argv:
index = sys.argv.index('--debug')
Expand All @@ -66,6 +127,24 @@ def configure_logger(ctx, param, debug):

logging.basicConfig(format='%(asctime)s %(name)s => %(message)s', level=level)

if debug:
# Log the contents of the config.ini file
config = configparser.ConfigParser()
config.read(CONFIG_FILE_USER)
LOG.debug('Config file contents:')
for section in config.sections():
LOG.debug('[%s]', section)
for key, value in config.items(section):
LOG.debug('%s = %s', key, value)

# Log the proxy settings if they were attempted
if 'proxy' in config:
LOG.debug('Proxy configuration attempted with settings: %s', dict(config['proxy']))

# Collect and log network telemetry data
network_telemetry = get_network_telemetry()
LOG.debug('Network telemetry: %s', network_telemetry)

@click.group(cls=SafetyCLILegacyGroup, help=CLI_MAIN_INTRODUCTION, epilog=DEFAULT_EPILOG)
@auth_options()
@proxy_options
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ install_requires =
safety_schemas>=0.0.2
typing-extensions>=4.7.1
filelock~=3.12.2
psutil~=6.0.0

[options.entry_points]
console_scripts =
Expand Down
3 changes: 2 additions & 1 deletion test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ typer
pydantic>=1.10.12
safety_schemas>=0.0.2
typing-extensions>=4.7.1
filelock~=3.12.2
filelock~=3.12.2
psutil~=6.0.0
97 changes: 95 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import shutil
import tempfile
import unittest
from datetime import datetime
from pathlib import Path
from unittest.mock import Mock, patch
import requests
import socket
import psutil

import click
from click.testing import CliRunner
Expand Down Expand Up @@ -563,4 +565,95 @@ def dummy_function():
preprocessed_args = sys.argv[1:] # Exclude the script name 'safety'

# Assert the preprocessed arguments
assert preprocessed_args == ['--debug', 'scan'], f"Preprocessed args: {preprocessed_args}"
assert preprocessed_args == ['--debug', 'scan'], f"Preprocessed args: {preprocessed_args}"

class TestNetworkTelemetry(unittest.TestCase):

@patch('psutil.net_io_counters')
@patch('psutil.net_if_addrs')
@patch('psutil.net_connections')
@patch('psutil.net_if_stats')
@patch('requests.get')
def test_get_network_telemetry(self, mock_requests_get, mock_net_if_stats, mock_net_connections, mock_net_if_addrs, mock_net_io_counters):
# Setup mocks
mock_net_io_counters.return_value = Mock(bytes_sent=1000, bytes_recv=2000, packets_sent=10, packets_recv=20)
mock_net_if_addrs.return_value = {'eth0': [Mock(family=socket.AF_INET, address='192.168.1.1')]}
mock_net_connections.return_value = [Mock(fd=1, family=socket.AF_INET, type=socket.SOCK_STREAM,
laddr=Mock(ip='192.168.1.1', port=8080),
raddr=Mock(ip='93.184.216.34', port=80),
status='ESTABLISHED')]
mock_net_if_stats.return_value = {'eth0': Mock(isup=True, duplex=2, speed=1000, mtu=1500)}

mock_response = Mock()
mock_response.content = b'a' * 1000 # Mock content length
mock_requests_get.return_value = mock_response

# Run the function
result = cli.get_network_telemetry()

# Assert the network telemetry data
self.assertEqual(result['bytes_sent'], 1000)
self.assertEqual(result['bytes_recv'], 2000)
self.assertEqual(result['packets_sent'], 10)
self.assertEqual(result['packets_recv'], 20)
self.assertIsNotNone(result['download_speed'])
self.assertEqual(result['interfaces'], {'eth0': ['192.168.1.1']})
self.assertEqual(result['connections'][0]['laddr'], '192.168.1.1:8080')
self.assertEqual(result['connections'][0]['raddr'], '93.184.216.34:80')
self.assertEqual(result['connections'][0]['status'], 'ESTABLISHED')
self.assertEqual(result['interface_stats']['eth0']['isup'], True)
self.assertEqual(result['interface_stats']['eth0']['duplex'], 2)
self.assertEqual(result['interface_stats']['eth0']['speed'], 1000)
self.assertEqual(result['interface_stats']['eth0']['mtu'], 1500)

@patch('requests.get', side_effect=requests.RequestException('Network error'))
def test_get_network_telemetry_request_exception(self, mock_requests_get):
# Run the function
result = cli.get_network_telemetry()

# Assert the download_speed is None and error is captured
self.assertIsNone(result['download_speed'])
self.assertIn('error', result)

@patch('psutil.net_io_counters', side_effect=psutil.AccessDenied('Access denied'))
def test_get_network_telemetry_access_denied(self, mock_net_io_counters):
# Run the function
result = cli.get_network_telemetry()

# Assert the error is captured
self.assertIn('error', result)
self.assertIn('Access denied', result['error'])

class TestConfigureLogger(unittest.TestCase):

@patch('configparser.ConfigParser.read')
@patch('safety.cli.get_network_telemetry')
def test_configure_logger_debug(self, mock_get_network_telemetry, mock_config_read):
mock_get_network_telemetry.return_value = {'dummy_key': 'dummy_value'}
mock_config_read.return_value = None

ctx = Mock()
param = Mock()
debug = True

with patch('sys.argv', ['--debug', 'true']), \
patch('logging.basicConfig') as mock_basicConfig, \
patch('configparser.ConfigParser.items', return_value=[('key', 'value')]), \
patch('configparser.ConfigParser.sections', return_value=['section']):
cli.configure_logger(ctx, param, debug)
mock_basicConfig.assert_called_with(format='%(asctime)s %(name)s => %(message)s', level=logging.DEBUG)

# Check if network telemetry logging was called
mock_get_network_telemetry.assert_called_once()

@patch('configparser.ConfigParser.read')
def test_configure_logger_non_debug(self, mock_config_read):
mock_config_read.return_value = None

ctx = Mock()
param = Mock()
debug = False

with patch('logging.basicConfig') as mock_basicConfig:
cli.configure_logger(ctx, param, debug)
mock_basicConfig.assert_called_with(format='%(asctime)s %(name)s => %(message)s', level=logging.CRITICAL)

0 comments on commit 83a07ec

Please sign in to comment.