Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trusted proxies config #67

Merged
merged 6 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,32 @@ import and configure the library with your Castle API secret.
configuration.blacklisted = ['HTTP-X-header']

# Castle needs the original IP of the client, not the IP of your proxy or load balancer.
# we try to fetch proper ip based on X-Forwarded-For or Remote-Addr headers in that order
# but sometimes proper ip may be stored in different header or order could be different.
# SDK can extract ip automatically for you, but you must configure which ip_headers you would like to use
# The SDK will only trust the proxy chain as defined in the configuration.
# We try to fetch the client IP based on X-Forwarded-For or Remote-Addr headers in that order,
# but sometimes the client IP may be stored in a different header or order.
# The SDK can be configured to look for the client IP address in headers that you specify.
# If the specified header or X-Forwarded-For default contains a proxy chain with public IP addresses,
# then one of the following must be set
# 1. The trusted_proxies value must match the known proxy IP's
# 2. The trusted_proxy_depth value must be set to the number of known trusted proxies in the chain (see below)
configuration.ip_headers = []

# Additionally to make X-Forwarded-For and other headers work better discovering client ip address,
# and not the address of a reverse proxy server, you can define trusted proxies
# which will help to fetch proper ip from those headers

# In order to extract the client IP of the X-Forwarded-For header
# and not the address of a reverse proxy server, you must define all trusted public proxies
# you can achieve this by listing all the proxies ip defined by string or regular expressions
# in trusted_proxies setting
configuration.trusted_proxies = []
# *Note: proxies list can be provided as an array of regular expressions
# or by providing number of trusted proxies used in the chain
configuration.trusted_proxy_depth = 0

# If there is no possibility to define options above and there is no other header which can have client ip
# then you may set trust_proxy_chain = true to trust all of the proxy IP's in X-Forwarded-For
configuration.trust_proxy_chain = false

# *Note: default always marked as trusty list is here: Castle::Configuration::TRUSTED_PROXIES

Tracking
Expand Down
23 changes: 23 additions & 0 deletions castle/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def __init__(self):
self.failover_strategy = FAILOVER_STRATEGY
self.ip_headers = []
self.trusted_proxies = []
self.trust_proxy_chain = False
self.trusted_proxy_depth = None

def isValid(self):
return self.host and self.port and self.api_secret
Expand Down Expand Up @@ -149,6 +151,27 @@ def trusted_proxies(self, value):
else:
raise ConfigurationError

@property
def trust_proxy_chain(self):
return self.__trust_proxy_chain

@trust_proxy_chain.setter
def trust_proxy_chain(self, value):
if isinstance(value, bool):
self.__trust_proxy_chain = value
else:
raise ConfigurationError

@property
def trusted_proxy_depth(self):
return self.__trusted_proxy_depth

@trusted_proxy_depth.setter
def trusted_proxy_depth(self, value):
if isinstance(value, (int, type(None))):
self.__trusted_proxy_depth = int(0 if value is None else value)
else:
raise ConfigurationError

# pylint: disable=invalid-name
configuration = Configuration()
40 changes: 22 additions & 18 deletions castle/extractors/ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

# ordered list of ip headers for ip extraction
DEFAULT = ['X-Forwarded-For', 'Remote-Addr']

# list of header which are used with proxy depth setting
DEPTH_RELATED = ['X-Forwarded-For']

class ExtractorsIp(object):
def __init__(self, headers):
Expand All @@ -14,32 +15,27 @@ def __init__(self, headers):
else:
self.ip_headers = DEFAULT
self.proxies = configuration.trusted_proxies + TRUSTED_PROXIES
self.trust_proxy_chain = configuration.trust_proxy_chain
self.trusted_proxy_depth = configuration.trusted_proxy_depth

def call(self):
all_ips = []

for ip_header in self.ip_headers:
ips = self._ips_from(ip_header)
filtered_ips = self._remove_proxies(ips)

if len(filtered_ips) > 0:
return filtered_ips[-1]

all_ips = all_ips + ips

# fallback to first whatever ip
if len(all_ips) > 0:
return all_ips[0]
ip_value = self._remove_proxies(ips)
if ip_value:
return ip_value
all_ips += ips

return None
return next(iter(all_ips), None)

def _remove_proxies(self, ips):
result = []
if self.trust_proxy_chain:
return next(iter(ips), None)

for ip_address in ips:
if not self._is_proxy(ip_address):
result.append(ip_address)
return result
result = [ip_address for ip_address in ips if not self._is_proxy(ip_address)]
return (result or [None])[-1]

def _is_proxy(self, ip_address):
for proxy_re in self.proxies:
Expand All @@ -54,4 +50,12 @@ def _ips_from(self, header):
if not value:
return []

return re.split(r'[,\s]+', value.strip())
ips = re.split(r'[,\s]+', value.strip())

return self._limit_proxy_depth(ips, header)

def _limit_proxy_depth(self, ips, ip_header):
if ip_header in DEPTH_RELATED:
ips = ips[:len(ips)-self.trusted_proxy_depth]

return ips
25 changes: 24 additions & 1 deletion castle/test/extractors/ip_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class ExtractorsIpTestCase(unittest.TestCase):
def tearDown(self):
configuration.ip_headers = []
configuration.trusted_proxies = []
configuration.trust_proxy_chain = False
configuration.trusted_proxy_depth = None

def test_extract_ip(self):
headers = {'X-Forwarded-For': '1.2.3.5'}
Expand Down Expand Up @@ -38,13 +40,34 @@ def test_extract_ip_when_all_trusted_proxies(self):
'127.0.0.1'
)

def test_extract_ip_when_trust_proxy_chain(self):
xf_header = """
6.6.6.6,2.2.2.3,6.6.6.5
"""
headers = {'Remote-Addr': '6.6.6.4', 'X-Forwarded-For': xf_header}
configuration.trust_proxy_chain = True
self.assertEqual(
ExtractorsIp(headers).call(),
'6.6.6.6'
)

def test_extract_ip_when_trust_proxy_depth(self):
xf_header = """
6.6.6.6,2.2.2.3,6.6.6.5
"""
headers = {'Remote-Addr': '6.6.6.4', 'X-Forwarded-For': xf_header}
configuration.trusted_proxy_depth = 1
self.assertEqual(
ExtractorsIp(headers).call(),
'2.2.2.3'
)

def test_extract_ip_for_spoof_ip_attempt(self):
headers = {'Client-Ip': '6.6.6.6', 'X-Forwarded-For': '6.6.6.6, 2.2.2.3, 192.168.0.7'}
self.assertEqual(
ExtractorsIp(headers).call(),
'2.2.2.3'
)
#

def test_extract_ip_for_spoof_ip_attempt_when_all_trusted_proxies(self):
headers = {'Client-Ip': '6.6.6.6', 'X-Forwarded-For': '6.6.6.6, 2.2.2.3, 192.168.0.7'}
Expand Down