Skip to content

Commit

Permalink
Trusted proxies config (#67)
Browse files Browse the repository at this point in the history
* Update readme.rst

* Add new settings to configuration.py

* Fix naming

* Update ip extractor

* Add corresponding test cases
  • Loading branch information
Maria Korlotian authored May 22, 2020
1 parent bea24be commit e80ec58
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 23 deletions.
25 changes: 21 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,32 @@ import and configure the library with your Castle API secret.
configuration.blacklisted = ['HTTP-X-header']
# Castle needs the original IP of the client, not the IP of your proxy or load balancer.
# we try to fetch proper ip based on X-Forwarded-For or Remote-Addr headers in that order
# but sometimes proper ip may be stored in different header or order could be different.
# SDK can extract ip automatically for you, but you must configure which ip_headers you would like to use
# The SDK will only trust the proxy chain as defined in the configuration.
# We try to fetch the client IP based on X-Forwarded-For or Remote-Addr headers in that order,
# but sometimes the client IP may be stored in a different header or order.
# The SDK can be configured to look for the client IP address in headers that you specify.
# If the specified header or X-Forwarded-For default contains a proxy chain with public IP addresses,
# then one of the following must be set
# 1. The trusted_proxies value must match the known proxy IP's
# 2. The trusted_proxy_depth value must be set to the number of known trusted proxies in the chain (see below)
configuration.ip_headers = []
# Additionally to make X-Forwarded-For and other headers work better discovering client ip address,
# and not the address of a reverse proxy server, you can define trusted proxies
# which will help to fetch proper ip from those headers
# In order to extract the client IP of the X-Forwarded-For header
# and not the address of a reverse proxy server, you must define all trusted public proxies
# you can achieve this by listing all the proxies ip defined by string or regular expressions
# in trusted_proxies setting
configuration.trusted_proxies = []
# *Note: proxies list can be provided as an array of regular expressions
# or by providing number of trusted proxies used in the chain
configuration.trusted_proxy_depth = 0
# If there is no possibility to define options above and there is no other header which can have client ip
# then you may set trust_proxy_chain = true to trust all of the proxy IP's in X-Forwarded-For
configuration.trust_proxy_chain = false
# *Note: default always marked as trusty list is here: Castle::Configuration::TRUSTED_PROXIES
Tracking
Expand Down
23 changes: 23 additions & 0 deletions castle/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def __init__(self):
self.failover_strategy = FAILOVER_STRATEGY
self.ip_headers = []
self.trusted_proxies = []
self.trust_proxy_chain = False
self.trusted_proxy_depth = None

def isValid(self):
return self.host and self.port and self.api_secret
Expand Down Expand Up @@ -149,6 +151,27 @@ def trusted_proxies(self, value):
else:
raise ConfigurationError

@property
def trust_proxy_chain(self):
return self.__trust_proxy_chain

@trust_proxy_chain.setter
def trust_proxy_chain(self, value):
if isinstance(value, bool):
self.__trust_proxy_chain = value
else:
raise ConfigurationError

@property
def trusted_proxy_depth(self):
return self.__trusted_proxy_depth

@trusted_proxy_depth.setter
def trusted_proxy_depth(self, value):
if isinstance(value, (int, type(None))):
self.__trusted_proxy_depth = int(0 if value is None else value)
else:
raise ConfigurationError

# pylint: disable=invalid-name
configuration = Configuration()
40 changes: 22 additions & 18 deletions castle/extractors/ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

# ordered list of ip headers for ip extraction
DEFAULT = ['X-Forwarded-For', 'Remote-Addr']

# list of header which are used with proxy depth setting
DEPTH_RELATED = ['X-Forwarded-For']

class ExtractorsIp(object):
def __init__(self, headers):
Expand All @@ -14,32 +15,27 @@ def __init__(self, headers):
else:
self.ip_headers = DEFAULT
self.proxies = configuration.trusted_proxies + TRUSTED_PROXIES
self.trust_proxy_chain = configuration.trust_proxy_chain
self.trusted_proxy_depth = configuration.trusted_proxy_depth

def call(self):
all_ips = []

for ip_header in self.ip_headers:
ips = self._ips_from(ip_header)
filtered_ips = self._remove_proxies(ips)

if len(filtered_ips) > 0:
return filtered_ips[-1]

all_ips = all_ips + ips

# fallback to first whatever ip
if len(all_ips) > 0:
return all_ips[0]
ip_value = self._remove_proxies(ips)
if ip_value:
return ip_value
all_ips += ips

return None
return next(iter(all_ips), None)

def _remove_proxies(self, ips):
result = []
if self.trust_proxy_chain:
return next(iter(ips), None)

for ip_address in ips:
if not self._is_proxy(ip_address):
result.append(ip_address)
return result
result = [ip_address for ip_address in ips if not self._is_proxy(ip_address)]
return (result or [None])[-1]

def _is_proxy(self, ip_address):
for proxy_re in self.proxies:
Expand All @@ -54,4 +50,12 @@ def _ips_from(self, header):
if not value:
return []

return re.split(r'[,\s]+', value.strip())
ips = re.split(r'[,\s]+', value.strip())

return self._limit_proxy_depth(ips, header)

def _limit_proxy_depth(self, ips, ip_header):
if ip_header in DEPTH_RELATED:
ips = ips[:len(ips)-self.trusted_proxy_depth]

return ips
25 changes: 24 additions & 1 deletion castle/test/extractors/ip_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class ExtractorsIpTestCase(unittest.TestCase):
def tearDown(self):
configuration.ip_headers = []
configuration.trusted_proxies = []
configuration.trust_proxy_chain = False
configuration.trusted_proxy_depth = None

def test_extract_ip(self):
headers = {'X-Forwarded-For': '1.2.3.5'}
Expand Down Expand Up @@ -38,13 +40,34 @@ def test_extract_ip_when_all_trusted_proxies(self):
'127.0.0.1'
)

def test_extract_ip_when_trust_proxy_chain(self):
xf_header = """
6.6.6.6,2.2.2.3,6.6.6.5
"""
headers = {'Remote-Addr': '6.6.6.4', 'X-Forwarded-For': xf_header}
configuration.trust_proxy_chain = True
self.assertEqual(
ExtractorsIp(headers).call(),
'6.6.6.6'
)

def test_extract_ip_when_trust_proxy_depth(self):
xf_header = """
6.6.6.6,2.2.2.3,6.6.6.5
"""
headers = {'Remote-Addr': '6.6.6.4', 'X-Forwarded-For': xf_header}
configuration.trusted_proxy_depth = 1
self.assertEqual(
ExtractorsIp(headers).call(),
'2.2.2.3'
)

def test_extract_ip_for_spoof_ip_attempt(self):
headers = {'Client-Ip': '6.6.6.6', 'X-Forwarded-For': '6.6.6.6, 2.2.2.3, 192.168.0.7'}
self.assertEqual(
ExtractorsIp(headers).call(),
'2.2.2.3'
)
#

def test_extract_ip_for_spoof_ip_attempt_when_all_trusted_proxies(self):
headers = {'Client-Ip': '6.6.6.6', 'X-Forwarded-For': '6.6.6.6, 2.2.2.3, 192.168.0.7'}
Expand Down

0 comments on commit e80ec58

Please sign in to comment.