From 232940d53cd9d77831a0807d2f8f64f2b3212031 Mon Sep 17 00:00:00 2001 From: Toni Date: Tue, 1 Oct 2024 12:00:51 +0200 Subject: [PATCH] harden proxy --- geonode/proxy/tests.py | 27 +++++++++++++++++++++++++++ geonode/proxy/views.py | 25 ++++++++++++++++++++----- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/geonode/proxy/tests.py b/geonode/proxy/tests.py index 3b04ec080f4..f25884ac55d 100644 --- a/geonode/proxy/tests.py +++ b/geonode/proxy/tests.py @@ -101,6 +101,33 @@ def test_proxy_not_allowed_path(self): response = self.client.get(f"{self.proxy_url}?url=http://example.com/xyz") self.assertEqual(response.status_code, 403, response.status_code) + # Test Cases for SSRF Vulnerability + + @patch("geonode.proxy.views.proxy_urls_registry", ProxyUrlsRegistry().set([TEST_DOMAIN])) + def test_proxy_ssrf_vulnerability(self): + """Test if SSRF vulnerability is prevented.""" + ssrf_payload = ( + f"{self.proxy_url}?url=https://geoserver%C5%95.{TEST_DOMAIN}%C5%95%09attacker.com%5cu0040google.com" + ) + response = self.client.get(ssrf_payload) + # Expect the SSRF attempt to be blocked (403 status) + self.assertEqual(response.status_code, 403, response.status_code) + + @patch("geonode.proxy.views.proxy_urls_registry", ProxyUrlsRegistry().set([TEST_DOMAIN])) + def test_proxy_invalid_hostname_characters(self): + """Test that invalid hostname characters are blocked.""" + invalid_host_payload = f"{self.proxy_url}?url=https://exam#ple.com" + response = self.client.get(invalid_host_payload) + self.assertEqual(response.status_code, 403, response.status_code) + + @patch("geonode.proxy.views.proxy_urls_registry", ProxyUrlsRegistry().set([TEST_DOMAIN])) + def test_proxy_valid_hostname_characters(self): + """Test that valid hostname characters are allowed.""" + valid_host_payload = f"{self.proxy_url}?url=https://valid-hostname.com" + response = self.client.get(valid_host_payload) + self.assertNotEqual(response.status_code, 403, response.status_code) + + @override_settings(PROXY_ALLOWED_PARAMS_NEEDLES=(), PROXY_ALLOWED_PATH_NEEDLES=()) # @patch("geonode.proxy.views.proxy_urls_registry", ProxyUrlsRegistry().clear()) def test_validate_remote_links_hosts(self): diff --git a/geonode/proxy/views.py b/geonode/proxy/views.py index fd593728bee..cbfad172e6f 100644 --- a/geonode/proxy/views.py +++ b/geonode/proxy/views.py @@ -57,6 +57,8 @@ from geonode.assets.utils import get_default_asset from zipstream import ZipStream from .utils import proxy_urls_registry +from urllib.parse import unquote + logger = logging.getLogger(__name__) @@ -92,8 +94,11 @@ def proxy( ) raw_url = url or request.GET["url"] - raw_url = urljoin(settings.SITEURL, raw_url) if raw_url.startswith("/") else raw_url - url = urlsplit(raw_url) + + # Ensure URL is fully decoded before validation + decoded_raw_url = unquote(raw_url) + raw_url = urljoin(settings.SITEURL, decoded_raw_url) if decoded_raw_url.startswith("/") else decoded_raw_url + url = urlsplit(decoded_raw_url) scheme = str(url.scheme) locator = str(url.path) if url.query != "": @@ -108,7 +113,17 @@ def proxy( ): proxy_allowed_hosts.append(url.hostname) - if not validate_host(extract_ip_or_domain(raw_url), proxy_allowed_hosts): + # Ensure the hostname only contains allowed characters + allowed_hostname_chars = re.compile(r'^[a-zA-Z0-9\.\-]+$') + if not allowed_hostname_chars.match(url.hostname): + return HttpResponse( + "Invalid request.", + status=403, + content_type="text/plain", + ) + + # Validate the decoded hostname + if not validate_host(extract_ip_or_domain(decoded_raw_url), proxy_allowed_hosts): return HttpResponse( "The url provided to the proxy service is not a valid hostname.", status=403, @@ -117,7 +132,7 @@ def proxy( # Collecting headers and cookies if not headers: - headers, access_token = get_headers(request, url, raw_url, allowed_hosts=allowed_hosts) + headers, access_token = get_headers(request, url, decoded_raw_url, allowed_hosts=allowed_hosts) if not access_token: auth_header = None if "Authorization" in headers: @@ -129,7 +144,7 @@ def proxy( user = get_auth_user(access_token) # Inject access_token if necessary - parsed = urlparse(raw_url) + parsed = urlparse(decoded_raw_url) parsed._replace(path=locator.encode("utf8")) if parsed.netloc == site_url.netloc and scheme != site_url.scheme: parsed = parsed._replace(scheme=site_url.scheme)