Skip to content

Commit

Permalink
Modify to use syntax up to Python 3.7
Browse files Browse the repository at this point in the history
Signed-off-by: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
  • Loading branch information
thombashi committed May 2, 2024
1 parent 0c36378 commit cc96e90
Show file tree
Hide file tree
Showing 36 changed files with 254 additions and 283 deletions.
12 changes: 5 additions & 7 deletions docs/make_readme.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ def write_examples(maker):
maker.write_lines(
[
"More examples are available at ",
"https://{:s}.rtfd.io/en/latest/pages/usage/index.html".format(PROJECT_NAME),
f"https://{PROJECT_NAME:s}.rtfd.io/en/latest/pages/usage/index.html",
]
)


def update_help():
for command in ["tcset", "tcdel", "tcshow"]:
runner = SubprocessRunner("{:s} -h".format(command))
runner = SubprocessRunner(f"{command:s} -h")
runner.run(env=dict(os.environ, LC_ALL="C.UTF-8"))
help_file_path = "pages/usage/{command:s}/{command:s}_help_output.txt".format(
command=command
Expand All @@ -66,7 +66,7 @@ def main():
PROJECT_NAME,
OUTPUT_DIR,
is_make_toc=True,
project_url="https://github.com/thombashi/{}".format(PROJECT_NAME),
project_url=f"https://github.com/thombashi/{PROJECT_NAME}",
)
maker.examples_dir_name = "usage"

Expand All @@ -84,12 +84,10 @@ def main():

maker.set_indent_level(0)
maker.write_chapter("Documentation")
maker.write_lines(["https://{:s}.rtfd.io/".format(PROJECT_NAME)])
maker.write_lines([f"https://{PROJECT_NAME:s}.rtfd.io/"])

maker.write_chapter("Troubleshooting")
maker.write_lines(
["https://{:s}.rtfd.io/en/latest/pages/troubleshooting.html".format(PROJECT_NAME)]
)
maker.write_lines([f"https://{PROJECT_NAME:s}.rtfd.io/en/latest/pages/troubleshooting.html"])

maker.write_chapter("Docker image")
maker.write_lines(["https://hub.docker.com/r/thombashi/tcconfig/"])
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


MODULE_NAME = "tcconfig"
REPOSITORY_URL = "https://github.com/thombashi/{:s}".format(MODULE_NAME)
REPOSITORY_URL = f"https://github.com/thombashi/{MODULE_NAME:s}"
REQUIREMENT_DIR = "requirements"
ENCODING = "utf8"

Expand Down Expand Up @@ -58,9 +58,9 @@ def get_release_command_class():
include_package_data=True,
packages=setuptools.find_packages(exclude=["test*"]),
project_urls={
"Documentation": "https://{:s}.rtfd.io/".format(MODULE_NAME),
"Documentation": f"https://{MODULE_NAME:s}.rtfd.io/",
"Source": REPOSITORY_URL,
"Tracker": "{:s}/issues".format(REPOSITORY_URL),
"Tracker": f"{REPOSITORY_URL:s}/issues",
},
python_requires=">=3.7",
install_requires=install_requires,
Expand Down
2 changes: 1 addition & 1 deletion tcconfig/__version__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__author__ = "Tsuyoshi Hombashi"
__copyright__ = "Copyright 2016, {}".format(__author__)
__copyright__ = f"Copyright 2016, {__author__}"
__license__ = "MIT License"
__version__ = "0.28.0"
__maintainer__ = __author__
Expand Down
4 changes: 1 addition & 3 deletions tcconfig/_argparse_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ def __init__(self, version, description=""):
"""
),
)
self.parser.add_argument(
"-V", "--version", action="version", version="%(prog)s {}".format(version)
)
self.parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {version}")
self._add_tc_command_arg_group()
self._add_log_level_argument_group()

Expand Down
6 changes: 3 additions & 3 deletions tcconfig/_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _has_capabilies(bin_path, capabilities):
return False

bin_path = os.path.realpath(bin_path)
proc = spr.SubprocessRunner("{:s} {:s}".format(getcap_bin_path, bin_path))
proc = spr.SubprocessRunner(f"{getcap_bin_path:s} {bin_path:s}")
if proc.run() != 0:
logger.error(proc.stderr)
sys.exit(proc.returncode)
Expand All @@ -56,9 +56,9 @@ def _has_capabilies(bin_path, capabilities):
has_capabilies = True
for capability in capabilities:
if re.search(capability, getcap_output):
logger.debug("{:s} has {:s} capability".format(bin_path, capability))
logger.debug(f"{bin_path:s} has {capability:s} capability")
else:
logger.debug("{:s} has no {:s} capability".format(bin_path, capability))
logger.debug(f"{bin_path:s} has no {capability:s} capability")
has_capabilies = False

capability = "+ep"
Expand Down
22 changes: 11 additions & 11 deletions tcconfig/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _to_regular_bin_path(file_path):
_bin_path_cache[command] = bin_path.abspath()
return _bin_path_cache[command]

for sbin_path in ("/sbin/{:s}".format(command), "/usr/sbin/{:s}".format(command)):
for sbin_path in (f"/sbin/{command:s}", f"/usr/sbin/{command:s}"):
if os.path.isfile(sbin_path):
_bin_path_cache[command] = _to_regular_bin_path(sbin_path)
return _bin_path_cache[command]
Expand All @@ -60,7 +60,7 @@ def check_command_installation(command):
if find_bin_path(command):
return

logger.error("command not found: {}".format(command))
logger.error(f"command not found: {command}")
sys.exit(errno.ENOENT)


Expand Down Expand Up @@ -88,20 +88,20 @@ def validate_within_min_max(param_name, value, min_value, max_value, unit):
if unit is None:
unit = ""
else:
unit = "[{:s}]".format(unit)
unit = f"[{unit:s}]"

if value > max_value:
raise ParameterError(
"'{:s}' is too high".format(param_name),
expected="<={:s}{:s}".format(DataProperty(max_value).to_str(), unit),
value="{:s}{:s}".format(DataProperty(value).to_str(), unit),
f"'{param_name:s}' is too high",
expected=f"<={DataProperty(max_value).to_str():s}{unit:s}",
value=f"{DataProperty(value).to_str():s}{unit:s}",
)

if value < min_value:
raise ParameterError(
"'{:s}' is too low".format(param_name),
expected=">={:s}{:s}".format(DataProperty(min_value).to_str(), unit),
value="{:s}{:s}".format(DataProperty(value).to_str(), unit),
f"'{param_name:s}' is too low",
expected=f">={DataProperty(min_value).to_str():s}{unit:s}",
value=f"{DataProperty(value).to_str():s}{unit:s}",
)


Expand Down Expand Up @@ -137,8 +137,8 @@ def run_command_helper(
error_msg = "\n".join(
[
"command execution failed",
" command={}".format(command),
" stderr={}".format(runner.stderr),
f" command={command}",
f" stderr={runner.stderr}",
]
)

Expand Down
14 changes: 6 additions & 8 deletions tcconfig/_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,16 @@ def create_veth_table(self, container):
sys.exit(errno.EPERM)

container_info = self.extract_container_info(container)
logger.debug(
"found container: name={}, pid={}".format(container_info.name, container_info.pid)
)
logger.debug(f"found container: name={container_info.name}, pid={container_info.pid}")

try:
netns_path = self.__get_netns_path(container_info.name)

if not os.path.lexists(netns_path):
logger.debug("make symlink to {}".format(netns_path))
logger.debug(f"make symlink to {netns_path}")

try:
Path("/proc/{:d}/ns/net".format(container_info.pid)).symlink(netns_path)
Path(f"/proc/{container_info.pid:d}/ns/net").symlink(netns_path)
except PermissionError as e:
logger.error(e)
sys.exit(errno.EPERM)
Expand Down Expand Up @@ -179,7 +177,7 @@ def __create_ifindex_table(self, container_name):
IfIndex.create()

proc = SubprocessRunner(
"ip netns exec {ns} ip link show type veth".format(ns=container_name),
f"ip netns exec {container_name} ip link show type veth",
dry_run=False,
)
if proc.run() != 0:
Expand All @@ -195,7 +193,7 @@ def __create_ifindex_table(self, container_name):
if not match:
continue

logger.debug("parse veth @{} [{:02d}] {}".format(container_name, i, line))
logger.debug(f"parse veth @{container_name} [{i:02d}] {line}")
ifindex, ifname, peer_ifindex = match.groups()
IfIndex.insert(
IfIndex(
Expand All @@ -212,7 +210,7 @@ def __create_ifindex_table(self, container_name):
return proc.returncode

for line in proc.stdout.splitlines():
logger.debug("parse veth @docker-host [{:02d}] {}".format(i, line))
logger.debug(f"parse veth @docker-host [{i:02d}] {line}")
match = veth_groups_regexp.search(line)
if not match:
continue
Expand Down
2 changes: 1 addition & 1 deletion tcconfig/_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __str__(self, *args, **kwargs):
item_list = [Exception.__str__(self, *args, **kwargs)]

if self._target:
item_list.append("{} not found: {}".format(self._target_type, self._target))
item_list.append(f"{self._target_type} not found: {self._target}")

return " ".join(item_list).strip()

Expand Down
29 changes: 13 additions & 16 deletions tcconfig/_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ def load_tcconfig(self, config_file_path):
self.__config_table = json.load(fp)

schema(self.__config_table)
self.__logger.debug(
"tc config file: {:s}".format(json.dumps(self.__config_table, indent=4))
)
dumped_config = json.dumps(self.__config_table, indent=4)
self.__logger.debug(f"tc config file: {dumped_config:s}")

def get_tcconfig_commands(self):
from .tcset import get_arg_parser
Expand Down Expand Up @@ -76,18 +75,16 @@ def get_tcconfig_commands(self):
if not filter_table:
continue

option_list = [device, "--direction={:s}".format(direction)] + (
option_list = [device, f"--direction={direction:s}"] + (
["--docker"] if is_container else []
)

for key, value in filter_table.items():
arg_item = "--{:s}={}".format(key, value)
arg_item = f"--{key:s}={value}"

parse_result = get_arg_parser().parse_known_args(["dummy", arg_item])
if parse_result[1]:
self.__logger.debug(
"unknown parameter: key={}, value={}".format(key, value)
)
self.__logger.debug(f"unknown parameter: key={key}, value={value}")
continue

option_list.append(arg_item)
Expand All @@ -98,7 +95,7 @@ def get_tcconfig_commands(self):
Network.Ipv4.ANYWHERE,
Network.Ipv6.ANYWHERE,
):
option_list.append("--src-network={:s}".format(src_network))
option_list.append(f"--src-network={src_network:s}")
except pp.ParseException:
pass

Expand All @@ -108,19 +105,19 @@ def get_tcconfig_commands(self):
Network.Ipv4.ANYWHERE,
Network.Ipv6.ANYWHERE,
):
option_list.append("--dst-network={:s}".format(dst_network))
option_list.append(f"--dst-network={dst_network:s}")
except pp.ParseException:
pass

try:
src_port = self.__parse_tc_filter_src_port(tc_filter)
option_list.append("--src-port={}".format(src_port))
option_list.append(f"--src-port={src_port}")
except pp.ParseException:
pass

try:
dst_port = self.__parse_tc_filter_dst_port(tc_filter)
option_list.append("--dst-port={}".format(dst_port))
option_list.append(f"--dst-port={dst_port}")
except pp.ParseException:
pass

Expand All @@ -140,29 +137,29 @@ def get_tcconfig_commands(self):

@staticmethod
def __parse_tc_filter_src_network(text):
network_pattern = pp.SkipTo("{:s}=".format(Tc.Param.SRC_NETWORK), include=True) + pp.Word(
network_pattern = pp.SkipTo(f"{Tc.Param.SRC_NETWORK:s}=", include=True) + pp.Word(
pp.alphanums + "." + "/"
)

return network_pattern.parseString(text)[-1]

@staticmethod
def __parse_tc_filter_dst_network(text):
network_pattern = pp.SkipTo("{:s}=".format(Tc.Param.DST_NETWORK), include=True) + pp.Word(
network_pattern = pp.SkipTo(f"{Tc.Param.DST_NETWORK:s}=", include=True) + pp.Word(
pp.alphanums + "." + "/"
)

return network_pattern.parseString(text)[-1]

@staticmethod
def __parse_tc_filter_src_port(text):
port_pattern = pp.SkipTo("{:s}=".format(Tc.Param.SRC_PORT), include=True) + pp.Word(pp.nums)
port_pattern = pp.SkipTo(f"{Tc.Param.SRC_PORT:s}=", include=True) + pp.Word(pp.nums)

return port_pattern.parseString(text)[-1]

@staticmethod
def __parse_tc_filter_dst_port(text):
port_pattern = pp.SkipTo("{:s}=".format(Tc.Param.DST_PORT), include=True) + pp.Word(pp.nums)
port_pattern = pp.SkipTo(f"{Tc.Param.DST_PORT:s}=", include=True) + pp.Word(pp.nums)

return port_pattern.parseString(text)[-1]

Expand Down
32 changes: 15 additions & 17 deletions tcconfig/_iptables.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_iptables_base_command():
return iptables_path

# debian/ubuntu may return /sbin/xtables-multi
return "{:s} iptables".format(iptables_path)
return f"{iptables_path:s} iptables"

return None

Expand Down Expand Up @@ -74,7 +74,7 @@ def __init__(
self.__protocol = protocol

if chain not in VALID_CHAIN_LIST:
raise ValueError("invalid chain: {}".format(chain))
raise ValueError(f"invalid chain: {chain}")

self.__chain = chain

Expand All @@ -93,15 +93,15 @@ def __repr__(self, *args, **kwargs):
str_list = []

if Integer(self.line_number).is_type():
str_list.append("line-num={}".format(self.line_number))
str_list.append(f"line-num={self.line_number}")

str_list.extend(
[
"protocol={:s}".format(self.protocol),
"source={:s}".format(self.source),
"destination={:s}".format(self.destination),
"mark_id={:d}".format(self.mark_id),
"chain={:s}".format(self.chain),
f"protocol={self.protocol:s}",
f"source={self.source:s}",
f"destination={self.destination:s}",
f"mark_id={self.mark_id:d}",
f"chain={self.chain:s}",
]
)

Expand All @@ -111,16 +111,16 @@ def to_append_command(self):
Integer(self.mark_id).validate()

command_item_list = [
"{:s} -A {:s} -t mangle -j MARK".format(get_iptables_base_command(), self.chain),
"--set-mark {}".format(self.mark_id),
f"{get_iptables_base_command():s} -A {self.chain:s} -t mangle -j MARK",
f"--set-mark {self.mark_id}",
]

if typepy.is_not_null_string(self.protocol) or Integer(self.protocol).is_type():
command_item_list.append("-p {}".format(self.protocol))
command_item_list.append(f"-p {self.protocol}")
if self.__is_valid_srcdst(self.source):
command_item_list.append("-s {:s}".format(self.source))
command_item_list.append(f"-s {self.source:s}")
if self.__is_valid_srcdst(self.destination):
command_item_list.append("-d {:s}".format(self.destination))
command_item_list.append(f"-d {self.destination:s}")

return " ".join(command_item_list)

Expand Down Expand Up @@ -167,9 +167,7 @@ def clear(self):
def get_iptables(self):
self.__check_execution_authority()

proc = SubprocessRunner(
"{:s} {:s}".format(get_iptables_base_command(), LIST_MANGLE_TABLE_OPTION)
)
proc = SubprocessRunner(f"{get_iptables_base_command():s} {LIST_MANGLE_TABLE_OPTION:s}")
if proc.run() != 0:
raise OSError(proc.returncode, proc.stderr)

Expand All @@ -179,7 +177,7 @@ def get_unique_mark_id(self):
self.__check_execution_authority()

mark_id_list = [mangle.mark_id for mangle in self.parse()]
logger.debug("mangle mark list: {}".format(mark_id_list))
logger.debug(f"mangle mark list: {mark_id_list}")

unique_mark_id = 1 + self.__MARK_ID_OFFSET
while unique_mark_id < self.__MAX_MARK_ID:
Expand Down
Loading

0 comments on commit cc96e90

Please sign in to comment.