Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Add functional tests to validate NSG feature (#1395)
Browse files Browse the repository at this point in the history
* Tests

Newly created scaleset proxy are behind NSG #1325

Apply existing rules to newly created scaleset in a new region #1326
* added README.md
  • Loading branch information
stishkin authored and stas committed Nov 4, 2021
1 parent 3ea0405 commit cbd065c
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/api-service/functional_tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Functional tests.

- `nsg_tst.py`

Requires an existing OneFuzz deployment running in Azure.
The OneFuzz deployment has to be already pre-configured using OneFuzz CLI or deployment `config_path` can be passed as command line argument to `nsg_test.py`.

`nsg_test.py` validates that OneFuzz configures NSGs correctly to allow or block access to debug proxy.
4 changes: 4 additions & 0 deletions src/api-service/functional_tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
263 changes: 263 additions & 0 deletions src/api-service/functional_tests/nsg_tst.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import argparse
import subprocess # nosec
import sys
import time
import uuid

from onefuzz.api import Onefuzz
from onefuzztypes.enums import OS, ScalesetState


class NsgTests:
def __init__(self, onefuzz_config_path=None):
self.onefuzz = Onefuzz(config_path=onefuzz_config_path)

def allow_all(self) -> None:
instance_config = self.onefuzz.instance_config.get()
instance_config.proxy_nsg_config.allowed_ips = ["*"]
self.onefuzz.instance_config.update(instance_config)

def block_all(self) -> None:
instance_config = self.onefuzz.instance_config.get()
instance_config.proxy_nsg_config.allowed_ips = []
self.onefuzz.instance_config.update(instance_config)

def generate_name(self) -> str:
return "nsg-test-%s" % uuid.uuid4().hex

def get_proxy_ip(self, scaleset: ScalesetState) -> str:
machine_id = scaleset.nodes[0].machine_id
scaleset_id = scaleset.scaleset_id
timeout_seconds = 60
time_waited = 0
wait_poll = 5

ip = None
print("Retrieving proxy IP for region: %s" % scaleset.region)

while time_waited < timeout_seconds and ip is None:
proxy = self.onefuzz.scaleset_proxy.create(
scaleset_id, machine_id, dst_port=1
)
ip = proxy.ip
if ip is None:
time.sleep(wait_poll)

if ip is None:
raise Exception("Failed to get proxy IP for region: %s" % scaleset.region)
return ip

def test_connection(self, ip: str) -> bool:
cmd = ["ping", ip]
if sys.platform == "linux":
cmd.append("-w")
cmd.append("5")
return 0 == subprocess.call(cmd) # nosec

def wait_for_scaleset(self, scaleset: ScalesetState) -> ScalesetState:
timeout_seconds = 600
wait_poll = 10
# wait for scaleset creation to finish
time_waited = 0
tmp_scaleset = scaleset
while (
time_waited < timeout_seconds
and tmp_scaleset.error is None
and tmp_scaleset.state != ScalesetState.running
):
tmp_scaleset = self.onefuzz.scalesets.get(tmp_scaleset.scaleset_id)
print(
"Waiting for scaleset creation... Current scaleset state: %s"
% tmp_scaleset.state
)
time.sleep(wait_poll)
time_waited = time_waited + wait_poll

if tmp_scaleset.error:
raise Exception(
"Failed to provision scaleset %s" % (tmp_scaleset.scaleset_id)
)

return tmp_scaleset

def create_pool(self, pool_name):
self.onefuzz.pools.create(pool_name, OS.linux)

def create_scaleset(self, pool_name: str, region: str) -> ScalesetState:
scaleset = self.onefuzz.scalesets.create(pool_name, 1, region=region)
return self.wait_for_scaleset(scaleset)

def wait_for_nsg_rules_to_apply(self) -> None:
time.sleep(120)

def test_proxy_cycle(self, pool_name: str, region: str) -> None:
scaleset = self.create_scaleset(pool_name, region)
try:
ip = self.get_proxy_ip(scaleset)
print("Allow connection")
self.allow_all()
self.wait_for_nsg_rules_to_apply()
# can ping since all allowed
result = self.test_connection(ip)
if not result:
raise Exception("Failed to connect to proxy")

print("Block connection")
self.block_all()
self.wait_for_nsg_rules_to_apply()
# should not be able to ping since all blocked
result = self.test_connection(ip)
if result:
raise Exception("Connected to proxy")

reset_result = self.onefuzz.scaleset_proxy.reset(region)
if not reset_result:
raise Exception("Failed to reset proxy for region %s" % region)

updated_ip = self.get_proxy_ip(scaleset)
# should be still blocked, since we blocked before we cycled
# the proxy
result = self.test_connection(updated_ip)
if result:
raise Exception("Connected to proxy")

self.allow_all()
self.wait_for_nsg_rules_to_apply()
result = self.test_connection(updated_ip)

if not result:
raise Exception("Failed to connect to proxy")

finally:
self.onefuzz.scalesets.shutdown(scaleset.scaleset_id, now=True)

def test_proxy_access(self, pool_name: str, region: str) -> None:
scaleset = self.create_scaleset(pool_name, region)
try:
ip = self.get_proxy_ip(scaleset)
print("Allow connection")

self.allow_all()
self.wait_for_nsg_rules_to_apply()
# can ping since all allowed
result = self.test_connection(ip)
if not result:
raise Exception("Failed to connect to proxy")

print("Block connection")
self.block_all()
self.wait_for_nsg_rules_to_apply()
# should not be able to ping since all blocked
result = self.test_connection(ip)
if result:
raise Exception("Connected to proxy")

print("Allow connection")
self.allow_all()
self.wait_for_nsg_rules_to_apply()
# can ping since all allowed
result = self.test_connection(ip)
if not result:
raise Exception("Failed to connect to proxy")

print("Block connection")
self.block_all()
self.wait_for_nsg_rules_to_apply()
# should not be able to ping since all blocked
result = self.test_connection(ip)
if result:
raise Exception("Connected to proxy")
finally:
self.onefuzz.scalesets.shutdown(scaleset.scaleset_id, now=True)

def test_new_scaleset_region(
self, pool_name: str, region1: str, region2: str
) -> None:
if region1 == region2:
raise Exception(
(
"Test input parameter validation failure.",
" Scalesets expted to be in different regions",
)
)

scaleset1 = self.create_scaleset(pool_name, region1)
scaleset2 = None
try:
ip1 = self.get_proxy_ip(scaleset1)

print("Block connection")
self.block_all()
self.wait_for_nsg_rules_to_apply()
# should not be able to ping since all blocked
print("Attempting connection for region %s" % region1)
result = self.test_connection(ip1)
if result:
raise Exception("Connected to proxy1 in region %s" % region1)

print("Allow connection")
self.allow_all()
self.wait_for_nsg_rules_to_apply()
# can ping since all allowed
print("Attempting connection for region %s" % region1)
result = self.test_connection(ip1)
if not result:
raise Exception("Failed to connect to proxy1 in region %s" % region1)

print("Creating scaleset in region %s" % region2)
scaleset2 = self.create_scaleset(pool_name, region2)

ip2 = self.get_proxy_ip(scaleset2)
# should not be able to ping since all blocked
print("Attempting connection for region %s" % region2)
result = self.test_connection(ip2)
if not result:
raise Exception("Failed to connect to proxy2 in region %s" % region2)

print("Block connection")
self.block_all()
self.wait_for_nsg_rules_to_apply()
# should not be able to ping since all blocked
print("Attempting connection for region %s" % region1)
result = self.test_connection(ip1)
if result:
raise Exception("Connected to proxy1 in region" % region1)

# should not be able to ping since all blocked
print("Attempting connection for region %s" % region2)
result = self.test_connection(ip2)
if result:
raise Exception("Connected to proxy2 in region %s" % region2)

finally:
self.onefuzz.scalesets.shutdown(scaleset1.scaleset_id, now=True)

if scaleset2:
self.onefuzz.scalesets.shutdown(scaleset2.scaleset_id, now=True)


def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--region1", required=True)
parser.add_argument("--region2", required=True)
parser.add_argument("--config_path", default=None)
args = parser.parse_args()

t = NsgTests(args.config_path)
pool_name = t.generate_name()
t.create_pool(pool_name)
print("Test basic proxy access")
t.test_proxy_access(pool_name, args.region1)
print("Test new region addition access")
t.test_new_scaleset_region(pool_name, args.region1, args.region2)
print("Test proxy cycle")
t.test_proxy_cycle(pool_name, args.region1)


if __name__ == "__main__":
main()

0 comments on commit cbd065c

Please sign in to comment.