diff --git a/docs/make_readme.py b/docs/make_readme.py index ec4d8ee..ec92655 100644 --- a/docs/make_readme.py +++ b/docs/make_readme.py @@ -39,14 +39,14 @@ def write_examples(maker): maker.write_lines( [ "More examples are available at ", - "https://{:s}.rtfd.io/en/latest/pages/usage/index.html".format(PROJECT_NAME), + f"https://{PROJECT_NAME:s}.rtfd.io/en/latest/pages/usage/index.html", ] ) def update_help(): for command in ["tcset", "tcdel", "tcshow"]: - runner = SubprocessRunner("{:s} -h".format(command)) + runner = SubprocessRunner(f"{command:s} -h") runner.run(env=dict(os.environ, LC_ALL="C.UTF-8")) help_file_path = "pages/usage/{command:s}/{command:s}_help_output.txt".format( command=command @@ -66,7 +66,7 @@ def main(): PROJECT_NAME, OUTPUT_DIR, is_make_toc=True, - project_url="https://github.com/thombashi/{}".format(PROJECT_NAME), + project_url=f"https://github.com/thombashi/{PROJECT_NAME}", ) maker.examples_dir_name = "usage" @@ -84,12 +84,10 @@ def main(): maker.set_indent_level(0) maker.write_chapter("Documentation") - maker.write_lines(["https://{:s}.rtfd.io/".format(PROJECT_NAME)]) + maker.write_lines([f"https://{PROJECT_NAME:s}.rtfd.io/"]) maker.write_chapter("Troubleshooting") - maker.write_lines( - ["https://{:s}.rtfd.io/en/latest/pages/troubleshooting.html".format(PROJECT_NAME)] - ) + maker.write_lines([f"https://{PROJECT_NAME:s}.rtfd.io/en/latest/pages/troubleshooting.html"]) maker.write_chapter("Docker image") maker.write_lines(["https://hub.docker.com/r/thombashi/tcconfig/"]) diff --git a/setup.py b/setup.py index 42aa771..3a0f524 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ MODULE_NAME = "tcconfig" -REPOSITORY_URL = "https://github.com/thombashi/{:s}".format(MODULE_NAME) +REPOSITORY_URL = f"https://github.com/thombashi/{MODULE_NAME:s}" REQUIREMENT_DIR = "requirements" ENCODING = "utf8" @@ -58,9 +58,9 @@ def get_release_command_class(): include_package_data=True, packages=setuptools.find_packages(exclude=["test*"]), project_urls={ - "Documentation": "https://{:s}.rtfd.io/".format(MODULE_NAME), + "Documentation": f"https://{MODULE_NAME:s}.rtfd.io/", "Source": REPOSITORY_URL, - "Tracker": "{:s}/issues".format(REPOSITORY_URL), + "Tracker": f"{REPOSITORY_URL:s}/issues", }, python_requires=">=3.7", install_requires=install_requires, diff --git a/tcconfig/__version__.py b/tcconfig/__version__.py index 1ce7630..0369e0f 100644 --- a/tcconfig/__version__.py +++ b/tcconfig/__version__.py @@ -1,5 +1,5 @@ __author__ = "Tsuyoshi Hombashi" -__copyright__ = "Copyright 2016, {}".format(__author__) +__copyright__ = f"Copyright 2016, {__author__}" __license__ = "MIT License" __version__ = "0.28.0" __maintainer__ = __author__ diff --git a/tcconfig/_argparse_wrapper.py b/tcconfig/_argparse_wrapper.py index 4c21cd0..2d4e1b5 100644 --- a/tcconfig/_argparse_wrapper.py +++ b/tcconfig/_argparse_wrapper.py @@ -25,9 +25,7 @@ def __init__(self, version, description=""): """ ), ) - self.parser.add_argument( - "-V", "--version", action="version", version="%(prog)s {}".format(version) - ) + self.parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {version}") self._add_tc_command_arg_group() self._add_log_level_argument_group() diff --git a/tcconfig/_capabilities.py b/tcconfig/_capabilities.py index 134a702..35af1cb 100644 --- a/tcconfig/_capabilities.py +++ b/tcconfig/_capabilities.py @@ -47,7 +47,7 @@ def _has_capabilies(bin_path, capabilities): return False bin_path = os.path.realpath(bin_path) - proc = spr.SubprocessRunner("{:s} {:s}".format(getcap_bin_path, bin_path)) + proc = spr.SubprocessRunner(f"{getcap_bin_path:s} {bin_path:s}") if proc.run() != 0: logger.error(proc.stderr) sys.exit(proc.returncode) @@ -56,9 +56,9 @@ def _has_capabilies(bin_path, capabilities): has_capabilies = True for capability in capabilities: if re.search(capability, getcap_output): - logger.debug("{:s} has {:s} capability".format(bin_path, capability)) + logger.debug(f"{bin_path:s} has {capability:s} capability") else: - logger.debug("{:s} has no {:s} capability".format(bin_path, capability)) + logger.debug(f"{bin_path:s} has no {capability:s} capability") has_capabilies = False capability = "+ep" diff --git a/tcconfig/_common.py b/tcconfig/_common.py index ae27cb8..3cf7d43 100644 --- a/tcconfig/_common.py +++ b/tcconfig/_common.py @@ -47,7 +47,7 @@ def _to_regular_bin_path(file_path): _bin_path_cache[command] = bin_path.abspath() return _bin_path_cache[command] - for sbin_path in ("/sbin/{:s}".format(command), "/usr/sbin/{:s}".format(command)): + for sbin_path in (f"/sbin/{command:s}", f"/usr/sbin/{command:s}"): if os.path.isfile(sbin_path): _bin_path_cache[command] = _to_regular_bin_path(sbin_path) return _bin_path_cache[command] @@ -60,7 +60,7 @@ def check_command_installation(command): if find_bin_path(command): return - logger.error("command not found: {}".format(command)) + logger.error(f"command not found: {command}") sys.exit(errno.ENOENT) @@ -88,20 +88,20 @@ def validate_within_min_max(param_name, value, min_value, max_value, unit): if unit is None: unit = "" else: - unit = "[{:s}]".format(unit) + unit = f"[{unit:s}]" if value > max_value: raise ParameterError( - "'{:s}' is too high".format(param_name), - expected="<={:s}{:s}".format(DataProperty(max_value).to_str(), unit), - value="{:s}{:s}".format(DataProperty(value).to_str(), unit), + f"'{param_name:s}' is too high", + expected=f"<={DataProperty(max_value).to_str():s}{unit:s}", + value=f"{DataProperty(value).to_str():s}{unit:s}", ) if value < min_value: raise ParameterError( - "'{:s}' is too low".format(param_name), - expected=">={:s}{:s}".format(DataProperty(min_value).to_str(), unit), - value="{:s}{:s}".format(DataProperty(value).to_str(), unit), + f"'{param_name:s}' is too low", + expected=f">={DataProperty(min_value).to_str():s}{unit:s}", + value=f"{DataProperty(value).to_str():s}{unit:s}", ) @@ -137,8 +137,8 @@ def run_command_helper( error_msg = "\n".join( [ "command execution failed", - " command={}".format(command), - " stderr={}".format(runner.stderr), + f" command={command}", + f" stderr={runner.stderr}", ] ) diff --git a/tcconfig/_docker.py b/tcconfig/_docker.py index 4c0980a..df6ef83 100644 --- a/tcconfig/_docker.py +++ b/tcconfig/_docker.py @@ -107,18 +107,16 @@ def create_veth_table(self, container): sys.exit(errno.EPERM) container_info = self.extract_container_info(container) - logger.debug( - "found container: name={}, pid={}".format(container_info.name, container_info.pid) - ) + logger.debug(f"found container: name={container_info.name}, pid={container_info.pid}") try: netns_path = self.__get_netns_path(container_info.name) if not os.path.lexists(netns_path): - logger.debug("make symlink to {}".format(netns_path)) + logger.debug(f"make symlink to {netns_path}") try: - Path("/proc/{:d}/ns/net".format(container_info.pid)).symlink(netns_path) + Path(f"/proc/{container_info.pid:d}/ns/net").symlink(netns_path) except PermissionError as e: logger.error(e) sys.exit(errno.EPERM) @@ -179,7 +177,7 @@ def __create_ifindex_table(self, container_name): IfIndex.create() proc = SubprocessRunner( - "ip netns exec {ns} ip link show type veth".format(ns=container_name), + f"ip netns exec {container_name} ip link show type veth", dry_run=False, ) if proc.run() != 0: @@ -195,7 +193,7 @@ def __create_ifindex_table(self, container_name): if not match: continue - logger.debug("parse veth @{} [{:02d}] {}".format(container_name, i, line)) + logger.debug(f"parse veth @{container_name} [{i:02d}] {line}") ifindex, ifname, peer_ifindex = match.groups() IfIndex.insert( IfIndex( @@ -212,7 +210,7 @@ def __create_ifindex_table(self, container_name): return proc.returncode for line in proc.stdout.splitlines(): - logger.debug("parse veth @docker-host [{:02d}] {}".format(i, line)) + logger.debug(f"parse veth @docker-host [{i:02d}] {line}") match = veth_groups_regexp.search(line) if not match: continue diff --git a/tcconfig/_error.py b/tcconfig/_error.py index 20fe225..d830ded 100644 --- a/tcconfig/_error.py +++ b/tcconfig/_error.py @@ -22,7 +22,7 @@ def __str__(self, *args, **kwargs): item_list = [Exception.__str__(self, *args, **kwargs)] if self._target: - item_list.append("{} not found: {}".format(self._target_type, self._target)) + item_list.append(f"{self._target_type} not found: {self._target}") return " ".join(item_list).strip() diff --git a/tcconfig/_importer.py b/tcconfig/_importer.py index 56466c4..c2a407d 100644 --- a/tcconfig/_importer.py +++ b/tcconfig/_importer.py @@ -42,9 +42,8 @@ def load_tcconfig(self, config_file_path): self.__config_table = json.load(fp) schema(self.__config_table) - self.__logger.debug( - "tc config file: {:s}".format(json.dumps(self.__config_table, indent=4)) - ) + dumped_config = json.dumps(self.__config_table, indent=4) + self.__logger.debug(f"tc config file: {dumped_config:s}") def get_tcconfig_commands(self): from .tcset import get_arg_parser @@ -76,18 +75,16 @@ def get_tcconfig_commands(self): if not filter_table: continue - option_list = [device, "--direction={:s}".format(direction)] + ( + option_list = [device, f"--direction={direction:s}"] + ( ["--docker"] if is_container else [] ) for key, value in filter_table.items(): - arg_item = "--{:s}={}".format(key, value) + arg_item = f"--{key:s}={value}" parse_result = get_arg_parser().parse_known_args(["dummy", arg_item]) if parse_result[1]: - self.__logger.debug( - "unknown parameter: key={}, value={}".format(key, value) - ) + self.__logger.debug(f"unknown parameter: key={key}, value={value}") continue option_list.append(arg_item) @@ -98,7 +95,7 @@ def get_tcconfig_commands(self): Network.Ipv4.ANYWHERE, Network.Ipv6.ANYWHERE, ): - option_list.append("--src-network={:s}".format(src_network)) + option_list.append(f"--src-network={src_network:s}") except pp.ParseException: pass @@ -108,19 +105,19 @@ def get_tcconfig_commands(self): Network.Ipv4.ANYWHERE, Network.Ipv6.ANYWHERE, ): - option_list.append("--dst-network={:s}".format(dst_network)) + option_list.append(f"--dst-network={dst_network:s}") except pp.ParseException: pass try: src_port = self.__parse_tc_filter_src_port(tc_filter) - option_list.append("--src-port={}".format(src_port)) + option_list.append(f"--src-port={src_port}") except pp.ParseException: pass try: dst_port = self.__parse_tc_filter_dst_port(tc_filter) - option_list.append("--dst-port={}".format(dst_port)) + option_list.append(f"--dst-port={dst_port}") except pp.ParseException: pass @@ -140,7 +137,7 @@ def get_tcconfig_commands(self): @staticmethod def __parse_tc_filter_src_network(text): - network_pattern = pp.SkipTo("{:s}=".format(Tc.Param.SRC_NETWORK), include=True) + pp.Word( + network_pattern = pp.SkipTo(f"{Tc.Param.SRC_NETWORK:s}=", include=True) + pp.Word( pp.alphanums + "." + "/" ) @@ -148,7 +145,7 @@ def __parse_tc_filter_src_network(text): @staticmethod def __parse_tc_filter_dst_network(text): - network_pattern = pp.SkipTo("{:s}=".format(Tc.Param.DST_NETWORK), include=True) + pp.Word( + network_pattern = pp.SkipTo(f"{Tc.Param.DST_NETWORK:s}=", include=True) + pp.Word( pp.alphanums + "." + "/" ) @@ -156,13 +153,13 @@ def __parse_tc_filter_dst_network(text): @staticmethod def __parse_tc_filter_src_port(text): - port_pattern = pp.SkipTo("{:s}=".format(Tc.Param.SRC_PORT), include=True) + pp.Word(pp.nums) + port_pattern = pp.SkipTo(f"{Tc.Param.SRC_PORT:s}=", include=True) + pp.Word(pp.nums) return port_pattern.parseString(text)[-1] @staticmethod def __parse_tc_filter_dst_port(text): - port_pattern = pp.SkipTo("{:s}=".format(Tc.Param.DST_PORT), include=True) + pp.Word(pp.nums) + port_pattern = pp.SkipTo(f"{Tc.Param.DST_PORT:s}=", include=True) + pp.Word(pp.nums) return port_pattern.parseString(text)[-1] diff --git a/tcconfig/_iptables.py b/tcconfig/_iptables.py index bc9167a..8151451 100644 --- a/tcconfig/_iptables.py +++ b/tcconfig/_iptables.py @@ -27,7 +27,7 @@ def get_iptables_base_command(): return iptables_path # debian/ubuntu may return /sbin/xtables-multi - return "{:s} iptables".format(iptables_path) + return f"{iptables_path:s} iptables" return None @@ -74,7 +74,7 @@ def __init__( self.__protocol = protocol if chain not in VALID_CHAIN_LIST: - raise ValueError("invalid chain: {}".format(chain)) + raise ValueError(f"invalid chain: {chain}") self.__chain = chain @@ -93,15 +93,15 @@ def __repr__(self, *args, **kwargs): str_list = [] if Integer(self.line_number).is_type(): - str_list.append("line-num={}".format(self.line_number)) + str_list.append(f"line-num={self.line_number}") str_list.extend( [ - "protocol={:s}".format(self.protocol), - "source={:s}".format(self.source), - "destination={:s}".format(self.destination), - "mark_id={:d}".format(self.mark_id), - "chain={:s}".format(self.chain), + f"protocol={self.protocol:s}", + f"source={self.source:s}", + f"destination={self.destination:s}", + f"mark_id={self.mark_id:d}", + f"chain={self.chain:s}", ] ) @@ -111,16 +111,16 @@ def to_append_command(self): Integer(self.mark_id).validate() command_item_list = [ - "{:s} -A {:s} -t mangle -j MARK".format(get_iptables_base_command(), self.chain), - "--set-mark {}".format(self.mark_id), + f"{get_iptables_base_command():s} -A {self.chain:s} -t mangle -j MARK", + f"--set-mark {self.mark_id}", ] if typepy.is_not_null_string(self.protocol) or Integer(self.protocol).is_type(): - command_item_list.append("-p {}".format(self.protocol)) + command_item_list.append(f"-p {self.protocol}") if self.__is_valid_srcdst(self.source): - command_item_list.append("-s {:s}".format(self.source)) + command_item_list.append(f"-s {self.source:s}") if self.__is_valid_srcdst(self.destination): - command_item_list.append("-d {:s}".format(self.destination)) + command_item_list.append(f"-d {self.destination:s}") return " ".join(command_item_list) @@ -167,9 +167,7 @@ def clear(self): def get_iptables(self): self.__check_execution_authority() - proc = SubprocessRunner( - "{:s} {:s}".format(get_iptables_base_command(), LIST_MANGLE_TABLE_OPTION) - ) + proc = SubprocessRunner(f"{get_iptables_base_command():s} {LIST_MANGLE_TABLE_OPTION:s}") if proc.run() != 0: raise OSError(proc.returncode, proc.stderr) @@ -179,7 +177,7 @@ def get_unique_mark_id(self): self.__check_execution_authority() mark_id_list = [mangle.mark_id for mangle in self.parse()] - logger.debug("mangle mark list: {}".format(mark_id_list)) + logger.debug(f"mangle mark list: {mark_id_list}") unique_mark_id = 1 + self.__MARK_ID_OFFSET while unique_mark_id < self.__MAX_MARK_ID: diff --git a/tcconfig/_main.py b/tcconfig/_main.py index ffa4d15..439b375 100644 --- a/tcconfig/_main.py +++ b/tcconfig/_main.py @@ -72,4 +72,4 @@ def _dump_history(self, tc, tc_command): write_tc_script(tc_command, command_history, filename_suffix=filename_suffix) return - logger.debug("command history\n{}".format(command_history)) + logger.debug(f"command history\n{command_history}") diff --git a/tcconfig/_netem_param.py b/tcconfig/_netem_param.py index a159727..ea3aa40 100644 --- a/tcconfig/_netem_param.py +++ b/tcconfig/_netem_param.py @@ -72,7 +72,7 @@ def __init__( "normal" if latency_distribution is None else latency_distribution ) if self.__latency_distribution not in DELAY_DISTRIBUTIONS: - raise ValueError("latency_distribution must be one of {}".format(DELAY_DISTRIBUTIONS)) + raise ValueError(f"latency_distribution must be one of {DELAY_DISTRIBUTIONS}") def __normalize_bandwidth_rate(self, bandwidth_rate): if not bandwidth_rate: @@ -140,40 +140,40 @@ def validate_bandwidth_rate(self): if hr_bps.bps < 8: raise hr.ParameterError( "bandwidth rate must be greater or equals to 8bps", - value="{}bps".format(hr_bps.bps), + value=f"{hr_bps.bps}bps", ) upper_limit_rate = get_upper_limit_rate(self.__device) if hr_bps > upper_limit_rate: raise hr.ParameterError( "exceed bandwidth rate limit", - value="{} kbps".format(hr_bps.kilo_bps), - expected="less than {} kbps".format(upper_limit_rate.kilo_bps), + value=f"{hr_bps.kilo_bps} kbps", + expected=f"less than {upper_limit_rate.kilo_bps} kbps", ) def make_param_name(self): item_list = [self.__device] if self.bandwidth_rate: - item_list.append("rate{}kbps".format(int(self.bandwidth_rate.kilo_bps))) + item_list.append(f"rate{int(self.bandwidth_rate.kilo_bps)}kbps") if self.__latency_time and self.__latency_time.milliseconds > 0: - item_list.append("delay{}".format(self.__latency_time.milliseconds)) + item_list.append(f"delay{self.__latency_time.milliseconds}") if self.__latency_distro_time and self.__latency_distro_time.milliseconds > 0: - item_list.append("distro{}".format(self.__latency_distro_time.milliseconds)) + item_list.append(f"distro{self.__latency_distro_time.milliseconds}") if self.__packet_loss_rate: - item_list.append("loss{}".format(self.__packet_loss_rate)) + item_list.append(f"loss{self.__packet_loss_rate}") if self.__packet_duplicate_rate: - item_list.append("duplicate{}".format(self.__packet_duplicate_rate)) + item_list.append(f"duplicate{self.__packet_duplicate_rate}") if self.__corruption_rate: - item_list.append("corrupt{}".format(self.__corruption_rate)) + item_list.append(f"corrupt{self.__corruption_rate}") if self.__reordering_rate: - item_list.append("reordering{}".format(self.__reordering_rate)) + item_list.append(f"reordering{self.__reordering_rate}") return "_".join(item_list) @@ -181,13 +181,13 @@ def make_netem_command_parts(self): item_list = ["netem"] if self.__packet_loss_rate > 0: - item_list.append("loss {:f}%".format(self.__packet_loss_rate)) + item_list.append(f"loss {self.__packet_loss_rate:f}%") if self.__packet_duplicate_rate > 0: - item_list.append("duplicate {:f}%".format(self.__packet_duplicate_rate)) + item_list.append(f"duplicate {self.__packet_duplicate_rate:f}%") if self.__latency_time and self.__latency_time > hr.Time(Tc.ValueRange.LatencyTime.MIN): - item_list.append("delay {}ms".format(self.__latency_time.milliseconds)) + item_list.append(f"delay {self.__latency_time.milliseconds}ms") if self.__latency_distro_time and self.__latency_distro_time > hr.Time( Tc.ValueRange.LatencyTime.MIN @@ -200,10 +200,10 @@ def make_netem_command_parts(self): ) if self.__corruption_rate > 0: - item_list.append("corrupt {:f}%".format(self.__corruption_rate)) + item_list.append(f"corrupt {self.__corruption_rate:f}%") if self.__reordering_rate > 0: - item_list.append("reorder {:f}%".format(self.__reordering_rate)) + item_list.append(f"reorder {self.__reordering_rate:f}%") return " ".join(item_list) @@ -224,7 +224,7 @@ def __validate_network_delay(self): max_value=Tc.ValueRange.LatencyTime.MAX, ) except hr.ParameterError as e: - raise hr.ParameterError("delay {}".format(e)) + raise hr.ParameterError(f"delay {e}") if self.__latency_distro_time: try: @@ -233,7 +233,7 @@ def __validate_network_delay(self): max_value=Tc.ValueRange.LatencyTime.MAX, ) except hr.ParameterError as e: - raise hr.ParameterError("delay-distro {}".format(e)) + raise hr.ParameterError(f"delay-distro {e}") def __validate_packet_loss_rate(self): validate_within_min_max( diff --git a/tcconfig/_network.py b/tcconfig/_network.py index 2fa9411..21e7029 100644 --- a/tcconfig/_network.py +++ b/tcconfig/_network.py @@ -19,7 +19,7 @@ def get_anywhere_network(ip_version): if ip_version_n == 6: return Network.Ipv6.ANYWHERE - raise ValueError("unknown ip version: {}".format(ip_version)) + raise ValueError(f"unknown ip version: {ip_version}") def _get_iproute2_upper_limite_rate(): @@ -36,7 +36,7 @@ def _get_iproute2_upper_limite_rate(): def _read_iface_speed(tc_device): - with open("/sys/class/net/{:s}/speed".format(tc_device)) as f: + with open(f"/sys/class/net/{tc_device:s}/speed") as f: return int(f.read().strip()) @@ -55,7 +55,7 @@ def get_upper_limit_rate(tc_device): return _get_iproute2_upper_limite_rate() return min( - hr.BitsPerSecond("{}Mbps".format(speed_value)), + hr.BitsPerSecond(f"{speed_value}Mbps"), _get_iproute2_upper_limite_rate(), ) @@ -72,7 +72,7 @@ def is_anywhere_network(network, ip_version): if ip_version == 6: return network in (get_anywhere_network(ip_version), "0:0:0:0:0:0:0:0/0") - raise ValueError("invalid ip version: {}".format(ip_version)) + raise ValueError(f"invalid ip version: {ip_version}") def sanitize_network(network, ip_version): @@ -105,7 +105,7 @@ def sanitize_network(network, ip_version): if ip_version == 6: return ipaddress.IPv6Network(str(network)).compressed - raise ValueError("unexpected ip version: {}".format(ip_version)) + raise ValueError(f"unexpected ip version: {ip_version}") def verify_network_interface(device, tc_command_output): diff --git a/tcconfig/_shaping_rule_finder.py b/tcconfig/_shaping_rule_finder.py index 1ae10f4..65d9bb5 100644 --- a/tcconfig/_shaping_rule_finder.py +++ b/tcconfig/_shaping_rule_finder.py @@ -39,13 +39,13 @@ def find_qdisc_handle(self, parent): def find_filter_param(self): where_query = And(self.__get_filter_conditions()) table_name = Filter.get_table_name() - self.__logger.debug("find filter param: table={}, where={}".format(table_name, where_query)) + self.__logger.debug(f"find filter param: table={table_name}, where={where_query}") for record in Filter.select(where=where_query): return record.as_dict() self.__logger.debug( - "find filter param: empty result (table={}, where={})".format(table_name, where_query) + f"find filter param: empty result (table={table_name}, where={where_query})" ) return None diff --git a/tcconfig/_tc_command_helper.py b/tcconfig/_tc_command_helper.py index 7b111e6..6d4af0a 100644 --- a/tcconfig/_tc_command_helper.py +++ b/tcconfig/_tc_command_helper.py @@ -21,9 +21,7 @@ def run_tc_show(subcommand, device, tc_command_output): verify_network_interface(device, tc_command_output) - runner = spr.SubprocessRunner( - "{:s} show dev {:s}".format(get_tc_base_command(subcommand), device) - ) + runner = spr.SubprocessRunner(f"{get_tc_base_command(subcommand):s} show dev {device:s}") if runner.run() != 0 and runner.stderr.find("Cannot find device") != -1: # reach here if the device does not exist at the system and netiface # not installed. diff --git a/tcconfig/_tc_script.py b/tcconfig/_tc_script.py index dc8f047..95cab0f 100644 --- a/tcconfig/_tc_script.py +++ b/tcconfig/_tc_script.py @@ -27,14 +27,14 @@ def write_tc_script(tcconfig_command, command_history, filename_suffix=None): "# command sequence in this script attempt to simulate the following " "tcconfig command:", "#", - "# {:s}".format(org_tcconfig_cmd), + f"# {org_tcconfig_cmd:s}", ] ) script_line_list.extend( [ "#", - "# the script execution result may different from '{}'".format(org_tcconfig_cmd), + f"# the script execution result may different from '{org_tcconfig_cmd}'", "#", "# created by {:s} on {:s}.".format( tcconfig_command, @@ -50,7 +50,7 @@ def write_tc_script(tcconfig_command, command_history, filename_suffix=None): fp.write("\n".join(script_line_list) + "\n") os.chmod(filename, 0o755) - logger.info("written a tc script to '{:s}'".format(filename)) + logger.info(f"written a tc script to '{filename:s}'") def _get_original_tcconfig_command(tcconfig_command): diff --git a/tcconfig/parser/_class.py b/tcconfig/parser/_class.py index f8ce55b..6f50b75 100644 --- a/tcconfig/parser/_class.py +++ b/tcconfig/parser/_class.py @@ -48,15 +48,13 @@ def parse(self, device, text): self.__parse_classid(line) self.__parse_rate(line) - logger.debug("parse a class entry: {}".format(self.__parsed_param)) + logger.debug(f"parse a class entry: {self.__parsed_param}") entry_list.append(self.__parsed_param) if entry_list: self._con.create_table_from_data_matrix(self._tc_subcommand, self.Key.LIST, entry_list) - logger.debug( - "tc {:s} parse result: {}".format(self._tc_subcommand, json.dumps(entry_list, indent=4)) - ) + logger.debug(f"tc {self._tc_subcommand:s} parse result: {json.dumps(entry_list, indent=4)}") return entry_list @@ -65,9 +63,9 @@ def _clear(self): def __parse_classid(self, line): self.__parsed_param[self.Key.CLASS_ID] = None - tag = "class {:s} ".format(ShapingAlgorithm.HTB) + tag = f"class {ShapingAlgorithm.HTB:s} " - match = re.search("{:s}{:s}".format(tag, self.Pattern.CLASS_ID), line) + match = re.search(f"{tag:s}{self.Pattern.CLASS_ID:s}", line) if match is None: return @@ -78,7 +76,7 @@ def __parse_classid(self, line): def __parse_rate(self, line): self.__parsed_param[self.Key.RATE] = None - match = re.search("rate {:s}".format(self.Pattern.RATE), line) + match = re.search(f"rate {self.Pattern.RATE:s}", line) if match is None: return diff --git a/tcconfig/parser/_filter.py b/tcconfig/parser/_filter.py index cd6d2ff..d9a8a18 100644 --- a/tcconfig/parser/_filter.py +++ b/tcconfig/parser/_filter.py @@ -89,7 +89,7 @@ def parse(self, device, text): try: self.__parse_mangle_mark(line) except pp.ParseException: - logger.debug("failed to parse mangle: {}".format(line)) + logger.debug(f"failed to parse mangle: {line}") else: Filter.insert( Filter( @@ -112,7 +112,7 @@ def parse(self, device, text): self.__parse_filter_id(line) if tc_filter.flowid: - logger.debug("store filter: {}".format(tc_filter)) + logger.debug(f"store filter: {tc_filter}") Filter.insert(tc_filter) self._clear() @@ -124,7 +124,7 @@ def parse(self, device, text): continue except pp.ParseException: - logger.debug("failed to parse flow id: {}".format(line)) + logger.debug(f"failed to parse flow id: {line}") try: if self.__ip_version == 4: @@ -132,9 +132,9 @@ def parse(self, device, text): elif self.__ip_version == 6: self.__parse_filter_ipv6(line) else: - raise ValueError("unknown ip version: {}".format(self.__ip_version)) + raise ValueError(f"unknown ip version: {self.__ip_version}") except pp.ParseException: - logger.debug("failed to parse filter: {}".format(line)) + logger.debug(f"failed to parse filter: {line}") if self.__flow_id: Filter.insert(self.__get_filter()) @@ -181,28 +181,22 @@ def __get_filter(self): def __parse_flow_id(self, line): parsed_list = self.__FILTER_FLOWID_PATTERN.parseString(line) self.__flow_id = parsed_list[-1] - logger.debug("succeed to parse flow id: flow-id={}, line={}".format(self.__flow_id, line)) + logger.debug(f"succeed to parse flow id: flow-id={self.__flow_id}, line={line}") def __parse_protocol(self, line): parsed_list = self.__FILTER_PROTOCOL_PATTERN.parseString(line) self.__protocol = parsed_list[-1] - logger.debug( - "succeed to parse protocol: protocol={}, line={}".format(self.__protocol, line) - ) + logger.debug(f"succeed to parse protocol: protocol={self.__protocol}, line={line}") def __parse_priority(self, line): parsed_list = self.__FILTER_PRIORITY_PATTERN.parseString(line) self.__priority = int(parsed_list[-1]) - logger.debug( - "succeed to parse priority: priority={}, line={}".format(self.__priority, line) - ) + logger.debug(f"succeed to parse priority: priority={self.__priority}, line={line}") def __parse_filter_id(self, line): parsed_list = self.__FILTER_ID_PATTERN.parseString(line) self.__filter_id = parsed_list[-1] - logger.debug( - "succeed to parse filter id: filter-id={}, line={}".format(self.__filter_id, line) - ) + logger.debug(f"succeed to parse filter id: filter-id={self.__filter_id}, line={line}") def __parse_mangle_mark(self, line): parsed_list = self.__FILTER_MANGLE_MARK_PATTERN.parseString(line) @@ -224,14 +218,14 @@ def __parse_filter_ip_line(self, line): def __parse_filter_ipv4_network(self, value_hex, mask_hex, match_id): ipaddr = ".".join([str(int(value_hex[i : i + 2], 16)) for i in range(0, len(value_hex), 2)]) netmask = bin(int(mask_hex, 16)).count("1") - network = "{:s}/{:d}".format(ipaddr, netmask) + network = f"{ipaddr:s}/{netmask:d}" if match_id == self.FilterMatchIdIpv4.INCOMING_NETWORK: self.__filter_src_network = network elif match_id == self.FilterMatchIdIpv4.OUTGOING_NETWORK: self.__filter_dst_network = network else: - logger.warning("unknown match id: {}".format(match_id)) + logger.warning(f"unknown match id: {match_id}") def __parse_filter_ipv6_network(self, value_hex, mask_hex, match_id): from collections import namedtuple @@ -294,7 +288,7 @@ def __parse_filter_ipv6_network(self, value_hex, mask_hex, match_id): dst_octet_list.extend(ipv6_entry.octet_list) dst_netmask += part_netmask else: - raise ValueError("unexpected ipv6 entry: {}".format(ipv6_entry)) + raise ValueError(f"unexpected ipv6 entry: {ipv6_entry}") while len(src_octet_list) < 8: src_octet_list.append("0000") @@ -314,14 +308,12 @@ def __parse_filter_port(self, value_hex): # the bottom-half represents destination port filter. if len(value_hex) != 8: - raise ValueError("invalid port filter value: {}".format(value_hex)) + raise ValueError(f"invalid port filter value: {value_hex}") src_port_hex = value_hex[:4] dst_port_hex = value_hex[4:] - logger.debug( - "parse ipv4 port: src-port-hex={}, dst-port-hex={}".format(src_port_hex, dst_port_hex) - ) + logger.debug(f"parse ipv4 port: src-port-hex={src_port_hex}, dst-port-hex={dst_port_hex}") src_port_decimal = int(src_port_hex, 16) self.__filter_src_port = src_port_decimal if src_port_decimal != 0 else None @@ -350,18 +342,18 @@ def __parse_filter_ipv4(self, line): ) return else: - logger.debug("unknown match id: {}".format(match_id)) + logger.debug(f"unknown match id: {match_id}") return logger.debug( "succeed to parse ipv4 filter: " + ", ".join( [ - "src_network={}".format(self.__filter_src_network), - "dst_network={}".format(self.__filter_dst_network), - "src_port={}".format(self.__filter_src_port), - "dst_port={}".format(self.__filter_dst_port), - "line={}".format(line), + f"src_network={self.__filter_src_network}", + f"dst_network={self.__filter_dst_network}", + f"src_port={self.__filter_src_port}", + f"dst_port={self.__filter_dst_port}", + f"line={line}", ] ) ) @@ -377,18 +369,18 @@ def __parse_filter_ipv6(self, line): elif match_id == self.FilterMatchIdIpv6.PORT: self.__parse_filter_port(value_hex) else: - logger.debug("unknown match id: {}".format(match_id)) + logger.debug(f"unknown match id: {match_id}") return logger.debug( "succeed to parse ipv6 filter: " + ", ".join( [ - "src_network={}".format(self.__filter_src_network), - "dst_network={}".format(self.__filter_dst_network), - "src_port={}".format(self.__filter_src_port), - "dst_port={}".format(self.__filter_dst_port), - "line={}".format(line), + f"src_network={self.__filter_src_network}", + f"dst_network={self.__filter_dst_network}", + f"src_port={self.__filter_src_port}", + f"dst_port={self.__filter_dst_port}", + f"line={line}", ] ) ) diff --git a/tcconfig/parser/_qdisc.py b/tcconfig/parser/_qdisc.py index 5d3bd90..899c532 100644 --- a/tcconfig/parser/_qdisc.py +++ b/tcconfig/parser/_qdisc.py @@ -61,7 +61,7 @@ def parse(self, device, text): self.__parse_netem_param(line, "reorder", pp.nums + ".%") self.__parse_bandwidth_rate(line) - logger.debug("parse a qdisc entry: {}".format(self.__parsed_param)) + logger.debug(f"parse a qdisc entry: {self.__parsed_param}") Qdisc.insert(Qdisc(**self.__parsed_param)) diff --git a/tcconfig/parser/shaping_rule.py b/tcconfig/parser/shaping_rule.py index 712053b..cb89819 100644 --- a/tcconfig/parser/shaping_rule.py +++ b/tcconfig/parser/shaping_rule.py @@ -129,7 +129,7 @@ def __get_ifb_from_device(self): return None filter_runner = subprocrunner.SubprocessRunner( - "{:s} show dev {:s} root".format(get_tc_base_command(TcSubCommand.FILTER), self.device), + f"{get_tc_base_command(TcSubCommand.FILTER):s} show dev {self.device:s} root", error_log_level=LogLevel.QUIET, dry_run=False, ) @@ -157,7 +157,7 @@ def __get_filter_key(self, filter_param): break else: - raise ValueError("mangle mark not found: {}".format(mangle)) + raise ValueError(f"mangle mark not found: {mangle}") else: src_network = filter_param.get(Tc.Param.SRC_NETWORK) if typepy.is_not_null_string(src_network) and not is_anywhere_network( @@ -173,7 +173,7 @@ def __get_filter_key(self, filter_param): src_port = filter_param.get(Tc.Param.SRC_PORT) if typepy.Integer(src_port).is_type(): - key_items[Tc.Param.SRC_PORT] = "{}".format(src_port) + key_items[Tc.Param.SRC_PORT] = f"{src_port}" elif src_port is not None: self.__logger.warning( "expected a integer value for {}, actual {}: {}".format( @@ -183,7 +183,7 @@ def __get_filter_key(self, filter_param): dst_port = filter_param.get(Tc.Param.DST_PORT) if typepy.Integer(dst_port).is_type(): - key_items[Tc.Param.DST_PORT] = "{}".format(dst_port) + key_items[Tc.Param.DST_PORT] = f"{dst_port}" elif src_port is not None: self.__logger.warning( "expected a integer value for {}, actual {}".format( @@ -195,7 +195,7 @@ def __get_filter_key(self, filter_param): if typepy.is_not_null_string(protocol): key_items[Tc.Param.PROTOCOL] = protocol - key = ", ".join(["{}={}".format(key, value) for key, value in key_items.items()]) + key = ", ".join([f"{key}={value}" for key, value in key_items.items()]) return key, key_items @@ -223,12 +223,12 @@ def __get_shaping_rule(self, device): for filter_param in filter_params: filter_param = filter_param.as_dict() - self.__logger.debug("{:s} param: {}".format(TcSubCommand.FILTER, filter_param)) + self.__logger.debug(f"{TcSubCommand.FILTER:s} param: {filter_param}") shaping_rule = {} filter_key, rule_with_keys = self.__get_filter_key(filter_param) if typepy.is_null_string(filter_key): - self.__logger.debug("empty filter key: {}".format(filter_param)) + self.__logger.debug(f"empty filter key: {filter_param}") continue qdisc_id = filter_param.get(Tc.Param.FLOW_ID) @@ -244,7 +244,7 @@ def __get_shaping_rule(self, device): for qdisc_param in qdisc_params: qdisc_param = qdisc_param.as_dict() - self.__logger.debug("{:s} param: {}".format(TcSubCommand.QDISC, qdisc_param)) + self.__logger.debug(f"{TcSubCommand.QDISC:s} param: {qdisc_param}") if self.is_parse_filter_id: shaping_rule[Tc.Param.FILTER_ID] = filter_param.get(Tc.Param.FILTER_ID) @@ -265,7 +265,7 @@ def __get_shaping_rule(self, device): ) for class_param in class_params: - self.__logger.debug("{:s} param: {}".format(TcSubCommand.CLASS, class_param)) + self.__logger.debug(f"{TcSubCommand.CLASS:s} param: {class_param}") if class_param.get(Tc.Param.CLASS_ID) not in ( filter_param.get(Tc.Param.FLOW_ID), @@ -284,10 +284,10 @@ def __get_shaping_rule(self, device): ) if not shaping_rule: - self.__logger.debug("shaping rule not found for '{}'".format(filter_param)) + self.__logger.debug(f"shaping rule not found for '{filter_param}'") continue - self.__logger.debug("shaping rule found: {} {}".format(filter_key, shaping_rule)) + self.__logger.debug(f"shaping rule found: {filter_key} {shaping_rule}") rule_with_keys.update(shaping_rule) shaping_rules.append(rule_with_keys) diff --git a/tcconfig/shaper/_interface.py b/tcconfig/shaper/_interface.py index 8247faa..bb69ca2 100644 --- a/tcconfig/shaper/_interface.py +++ b/tcconfig/shaper/_interface.py @@ -30,11 +30,11 @@ def set_shaping(self): # pragma: no cover class AbstractShaper(ShaperInterface): @property def _tc_device(self): - return "{:s}".format(self._tc_obj.get_tc_device()) + return f"{self._tc_obj.get_tc_device():s}" @property def _dev(self): - return "dev {:s}".format(self._tc_device) + return f"dev {self._tc_device:s}" @property def _existing_parent(self): @@ -63,17 +63,17 @@ def __init__(self, tc_obj): def _set_netem(self): base_command = self._tc_obj.get_tc_command(TcSubCommand.QDISC) parent = self._get_tc_parent( - "{:s}:{:d}".format(self._tc_obj.qdisc_major_id_str, self._get_qdisc_minor_id()) + f"{self._tc_obj.qdisc_major_id_str:s}:{self._get_qdisc_minor_id():d}" ) handle = self._get_tc_handle( - "{:x}:".format(self._get_netem_qdisc_major_id(self._tc_obj.qdisc_major_id)) + f"{self._get_netem_qdisc_major_id(self._tc_obj.qdisc_major_id):x}:" ) command_item_list = [ base_command, - "dev {:s}".format(self._tc_obj.get_tc_device()), - "parent {:s}".format(parent), - "handle {:s}".format(handle), + f"dev {self._tc_obj.get_tc_device():s}", + f"parent {parent:s}", + f"handle {handle:s}", self._tc_obj.netem_param.make_netem_command_parts(), ] @@ -111,13 +111,13 @@ def _add_filter(self): command_item_list = [ self._tc_obj.get_tc_command(TcSubCommand.FILTER), self._dev, - "protocol {:s}".format(self._tc_obj.protocol), - "parent {:s}:".format(self._tc_obj.qdisc_major_id_str), - "prio {:d}".format(self._get_filter_prio(is_exclude_filter=False)), + f"protocol {self._tc_obj.protocol:s}", + f"parent {self._tc_obj.qdisc_major_id_str:s}:", + f"prio {self._get_filter_prio(is_exclude_filter=False):d}", ] if self._is_use_iptables(): - command_item_list.append("handle {:d} fw".format(self._get_unique_mangle_mark_id())) + command_item_list.append(f"handle {self._get_unique_mangle_mark_id():d} fw") else: if typepy.is_null_string(self._tc_obj.dst_network): dst_network = get_anywhere_network(self._tc_obj.ip_version) @@ -153,7 +153,7 @@ def _add_filter(self): ) command_item_list.append( - "flowid {:s}:{:d}".format(self._tc_obj.qdisc_major_id_str, self._get_qdisc_minor_id()) + f"flowid {self._tc_obj.qdisc_major_id_str:s}:{self._get_qdisc_minor_id():d}" ) return subprocrunner.SubprocessRunner(" ".join(command_item_list)).run() diff --git a/tcconfig/shaper/htb.py b/tcconfig/shaper/htb.py index 1036b89..d5745c0 100644 --- a/tcconfig/shaper/htb.py +++ b/tcconfig/shaper/htb.py @@ -39,7 +39,7 @@ def __init__(self, tc_obj): def _get_qdisc_minor_id(self): if self.__qdisc_minor_id is None: self.__qdisc_minor_id = self.__get_unique_qdisc_minor_id() - logger.debug("__get_unique_qdisc_minor_id: {:d}".format(self.__qdisc_minor_id)) + logger.debug(f"__get_unique_qdisc_minor_id: {self.__qdisc_minor_id:d}") return self.__qdisc_minor_id @@ -54,7 +54,7 @@ def _make_qdisc(self): return 0 base_command = self._tc_obj.get_tc_command(TcSubCommand.QDISC) - handle = "{:s}:".format(self._tc_obj.qdisc_major_id_str) + handle = f"{self._tc_obj.qdisc_major_id_str:s}:" if self._tc_obj.is_add_shaping_rule: message = None @@ -75,9 +75,9 @@ def _make_qdisc(self): base_command, self._dev, "root", - "handle {:s}".format(handle), + f"handle {handle:s}", self.algorithm_name, - "default {:d}".format(self.__DEFAULT_CLASS_MINOR_ID), + f"default {self.__DEFAULT_CLASS_MINOR_ID:d}", ] ), ignore_error_msg_regexp=self._tc_obj.REGEXP_FILE_EXISTS, @@ -90,10 +90,10 @@ def _make_qdisc(self): def _add_rate(self): base_command = self._tc_obj.get_tc_command(TcSubCommand.CLASS) parent = "{:s}:".format( - self._get_tc_parent("{:s}:".format(self._tc_obj.qdisc_major_id_str)).split(":")[0] + self._get_tc_parent(f"{self._tc_obj.qdisc_major_id_str:s}:").split(":")[0] ) classid = self._get_tc_parent( - "{:s}:{:d}".format(self._tc_obj.qdisc_major_id_str, self._get_qdisc_minor_id()) + f"{self._tc_obj.qdisc_major_id_str:s}:{self._get_qdisc_minor_id():d}" ) upper_limit_rate = get_upper_limit_rate(self._tc_device) @@ -104,18 +104,18 @@ def _add_rate(self): command_item_list = [ base_command, self._dev, - "parent {:s}".format(parent), - "classid {:s}".format(classid), + f"parent {parent:s}", + f"classid {classid:s}", self.algorithm_name, - "rate {}Kbit".format(bandwidth.kilo_bps), - "ceil {}Kbit".format(bandwidth.kilo_bps), + f"rate {bandwidth.kilo_bps}Kbit", + f"ceil {bandwidth.kilo_bps}Kbit", ] if bandwidth != upper_limit_rate: command_item_list.extend( [ - "burst {}KB".format(bandwidth.kilo_byte_per_sec), - "cburst {}KB".format(bandwidth.kilo_byte_per_sec), + f"burst {bandwidth.kilo_byte_per_sec}KB", + f"cburst {bandwidth.kilo_byte_per_sec}KB", ] ) @@ -156,9 +156,9 @@ def _add_exclude_filter(self): command_item_list = [ self._tc_obj.get_tc_command(TcSubCommand.FILTER), self._dev, - "protocol {:s}".format(self._tc_obj.protocol), - "parent {:s}:".format(self._tc_obj.qdisc_major_id_str), - "prio {:d}".format(self._get_filter_prio(is_exclude_filter=True)), + f"protocol {self._tc_obj.protocol:s}", + f"parent {self._tc_obj.qdisc_major_id_str:s}:", + f"prio {self._get_filter_prio(is_exclude_filter=True):d}", "u32", ] @@ -190,7 +190,7 @@ def _add_exclude_filter(self): ) ) - command_item_list.append("flowid {:s}".format(self.__classid_wo_shaping)) + command_item_list.append(f"flowid {self.__classid_wo_shaping:s}") return subprocrunner.SubprocessRunner(" ".join(command_item_list)).run() @@ -253,10 +253,8 @@ def __get_unique_qdisc_minor_id(self): except typepy.TypeConversionError: continue - logger.debug("existing class list with {:s}: {}".format(self._dev, exist_class_item_list)) - logger.debug( - "existing minor classid list with {:s}: {}".format(self._dev, exist_class_minor_id_list) - ) + logger.debug(f"existing class list with {self._dev:s}: {exist_class_item_list}") + logger.debug(f"existing minor classid list with {self._dev:s}: {exist_class_minor_id_list}") next_minor_id = self.__DEFAULT_CLASS_MINOR_ID while True: @@ -274,7 +272,7 @@ def __extract_exist_netem_major_ids(self) -> List[int]: re.MULTILINE, ) - logger.debug("existing netem list with {:s}: {}".format(self._dev, exist_netem_items)) + logger.debug(f"existing netem list with {self._dev:s}: {exist_netem_items}") exist_netem_major_id_list = [] for netem_item in exist_netem_items: @@ -285,9 +283,7 @@ def __extract_exist_netem_major_ids(self) -> List[int]: def __get_unique_netem_major_id(self): exist_netem_major_ids = self.__extract_exist_netem_major_ids() - logger.debug( - "existing netem major id list with {:s}: {}".format(self._dev, exist_netem_major_ids) - ) + logger.debug(f"existing netem major id list with {self._dev:s}: {exist_netem_major_ids}") next_netem_major_id = self._tc_obj.netem_param.calc_device_qdisc_major_id() while True: @@ -300,8 +296,8 @@ def __get_unique_netem_major_id(self): def __add_default_class(self): base_command = self._tc_obj.get_tc_command(TcSubCommand.CLASS) - parent = "{:s}:".format(self._tc_obj.qdisc_major_id_str) - classid = "{:s}:{:d}".format(self._tc_obj.qdisc_major_id_str, self.__DEFAULT_CLASS_MINOR_ID) + parent = f"{self._tc_obj.qdisc_major_id_str:s}:" + classid = f"{self._tc_obj.qdisc_major_id_str:s}:{self.__DEFAULT_CLASS_MINOR_ID:d}" self.__classid_wo_shaping = classid if self._tc_obj.is_add_shaping_rule: @@ -324,10 +320,10 @@ def __add_default_class(self): [ base_command, self._dev, - "parent {:s}".format(parent), - "classid {:s}".format(classid), + f"parent {parent:s}", + f"classid {classid:s}", self.algorithm_name, - "rate {}kbit".format(get_upper_limit_rate(self._tc_device).kilo_bps), + f"rate {get_upper_limit_rate(self._tc_device).kilo_bps}kbit", ] ), ignore_error_msg_regexp=self._tc_obj.REGEXP_FILE_EXISTS, diff --git a/tcconfig/shaper/tbf.py b/tcconfig/shaper/tbf.py index 962d539..93d4262 100644 --- a/tcconfig/shaper/tbf.py +++ b/tcconfig/shaper/tbf.py @@ -47,10 +47,10 @@ def _get_netem_qdisc_major_id(self, base_id): def _make_qdisc(self): base_command = self._tc_obj.get_tc_command(TcSubCommand.QDISC) - handle = "{:s}:".format(self._tc_obj.qdisc_major_id_str) + handle = f"{self._tc_obj.qdisc_major_id_str:s}:" return run_command_helper( - " ".join([base_command, self._dev, "root", "handle {:s}".format(handle), "prio"]), + " ".join([base_command, self._dev, "root", f"handle {handle:s}", "prio"]), ignore_error_msg_regexp=self._tc_obj.REGEXP_FILE_EXISTS, notice_msg=self._tc_obj.EXISTS_MSG_TEMPLATE.format( "failed to '{command:s}': prio qdisc already exists " @@ -74,7 +74,7 @@ def _add_rate(self): self._get_netem_qdisc_major_id(self._tc_obj.qdisc_major_id), self._get_qdisc_minor_id(), ) - handle = "{:d}:".format(20) + handle = f"{20:d}:" upper_limit_rate = get_upper_limit_rate(self._tc_device) bandwidth = self._tc_obj.netem_param.bandwidth_rate @@ -85,10 +85,10 @@ def _add_rate(self): [ base_command, self._dev, - "parent {:s}".format(parent), - "handle {:s}".format(handle), + f"parent {parent:s}", + f"handle {handle:s}", self.algorithm_name, - "rate {}kbit".format(bandwidth.kilo_bps), + f"rate {bandwidth.kilo_bps}kbit", "buffer {:d}".format( max(int(bandwidth.kilo_bps), self.__MIN_BUFFER_BYTE) ), # [byte] @@ -138,23 +138,23 @@ def __set_pre_network_filter(self): not typepy.Integer(self._tc_obj.dst_port).is_type(), ] ): - flowid = "{:s}:{:d}".format(self._tc_obj.qdisc_major_id_str, self._get_qdisc_minor_id()) + flowid = f"{self._tc_obj.qdisc_major_id_str:s}:{self._get_qdisc_minor_id():d}" else: - flowid = "{:s}:2".format(self._tc_obj.qdisc_major_id_str) + flowid = f"{self._tc_obj.qdisc_major_id_str:s}:2" return SubprocessRunner( " ".join( [ self._tc_obj.get_tc_command(TcSubCommand.FILTER), self._dev, - "protocol {:s}".format(self._tc_obj.protocol), - "parent {:s}:".format(self._tc_obj.qdisc_major_id_str), + f"protocol {self._tc_obj.protocol:s}", + f"parent {self._tc_obj.qdisc_major_id_str:s}:", "prio 2 u32 match {:s} {:s} {:s}".format( self._tc_obj.protocol, self._get_network_direction_str(), get_anywhere_network(self._tc_obj.ip_version), ), - "flowid {:s}".format(flowid), + f"flowid {flowid:s}", ] ) ).run() diff --git a/tcconfig/tcdel.py b/tcconfig/tcdel.py index 1230b84..91679fa 100644 --- a/tcconfig/tcdel.py +++ b/tcconfig/tcdel.py @@ -100,7 +100,7 @@ def __create_tc_obj(self, tc_target): break else: logger.error( - "shaping rule not found associated with the id ({}).".format(options.filter_id) + f"shaping rule not found associated with the id ({options.filter_id})." ) sys.exit(1) else: diff --git a/tcconfig/tcshow.py b/tcconfig/tcshow.py index 640549c..f3f45b5 100644 --- a/tcconfig/tcshow.py +++ b/tcconfig/tcshow.py @@ -178,7 +178,7 @@ def extract_tc_params(options): export_settings(options.export_path, out_rules, in_rules) tc_params.update(rule_parser.get_tc_parameter()) - key = "{id} (device={veth})".format(id=container_info.id[:12], veth=veth) + key = f"{container_info.id[:12]} (device={veth})" tc_params[key] = tc_params.pop(veth) else: verify_network_interface(device, options.tc_command_output) @@ -228,7 +228,7 @@ def main(): ) return 0 - logger.debug("command history\n{}".format(command_history)) + logger.debug(f"command history\n{command_history}") print_tc(json.dumps(tc_params, ensure_ascii=False, indent=4), options.color) diff --git a/tcconfig/traffic_control.py b/tcconfig/traffic_control.py index efcef3c..cb1b9d2 100644 --- a/tcconfig/traffic_control.py +++ b/tcconfig/traffic_control.py @@ -60,7 +60,7 @@ def tc_target(self): @property def ifb_device(self): - return "ifb{:d}".format(self.__qdisc_major_id) + return f"ifb{self.__qdisc_major_id:d}" @property def direction(self): @@ -120,7 +120,7 @@ def qdisc_major_id(self): @property def qdisc_major_id_str(self): - return "{:x}".format(self.__qdisc_major_id) + return f"{self.__qdisc_major_id:x}" @property def ip_version(self): @@ -303,7 +303,7 @@ def delete_all_rules(self): try: self.iptables_ctrl.clear() except OSError as e: - logger.warning("{} (can not delete iptables entries)".format(e)) + logger.warning(f"{e} (can not delete iptables entries)") return any(result_list) @@ -316,14 +316,14 @@ def delete_tc(self): filter_param = rule_finder.find_filter_param() if not filter_param: - message = "shaping rule not found ({}).".format(rule_finder.get_filter_string()) + message = f"shaping rule not found ({rule_finder.get_filter_string()})." if rule_finder.is_empty_filter_condition(): message += " you can delete all of the shaping rules with --all option." logger.error(message) return 1 - logger.info("delete a shaping rule: {}".format(dict(filter_param))) + logger.info(f"delete a shaping rule: {dict(filter_param)}") filter_del_command = ( "{command:s} del dev {dev:s} protocol {protocol:s} " @@ -417,15 +417,15 @@ def __setup_ifb(self): "{:s} link set dev {:s} up".format(find_bin_path("ip"), self.ifb_device) ).run() - base_command = "{:s} add".format(get_tc_base_command(TcSubCommand.QDISC)) + base_command = f"{get_tc_base_command(TcSubCommand.QDISC):s} add" if self.is_add_shaping_rule or self.is_change_shaping_rule: notice_message = None else: notice_message = self.EXISTS_MSG_TEMPLATE.format( - "failed to '{:s}': ingress qdisc already exists.".format(base_command) + f"failed to '{base_command:s}': ingress qdisc already exists." ) return_code |= run_command_helper( - "{:s} dev {:s} ingress".format(base_command, self.device), + f"{base_command:s} dev {self.device:s} ingress", ignore_error_msg_regexp=self.REGEXP_FILE_EXISTS, notice_msg=notice_message, ) @@ -433,12 +433,12 @@ def __setup_ifb(self): return_code |= spr.SubprocessRunner( " ".join( [ - "{:s} add".format(get_tc_base_command(TcSubCommand.FILTER)), - "dev {:s}".format(self.device), - "parent ffff: protocol {:s} u32 match u32 0 0".format(self.protocol), - "flowid {:x}:".format(self.__qdisc_major_id), + f"{get_tc_base_command(TcSubCommand.FILTER):s} add", + f"dev {self.device:s}", + f"parent ffff: protocol {self.protocol:s} u32 match u32 0 0", + f"flowid {self.__qdisc_major_id:x}:", "action mirred egress redirect", - "dev {:s}".format(self.ifb_device), + f"dev {self.ifb_device:s}", ] ) ).run() @@ -446,7 +446,7 @@ def __setup_ifb(self): return return_code def __delete_qdisc(self): - logging_msg = "delete {} qdisc".format(self.device) + logging_msg = f"delete {self.device} qdisc" with logging_context(logging_msg): returncode = run_command_helper( @@ -472,7 +472,7 @@ def __delete_qdisc(self): return is_success def __delete_ingress_qdisc(self): - logging_msg = "delete {} ingress qdisc".format(self.device) + logging_msg = f"delete {self.device} ingress qdisc" with logging_context(logging_msg): returncode = run_command_helper( @@ -500,7 +500,7 @@ def __delete_ingress_qdisc(self): def __delete_ifb_device(self): from ._capabilities import get_permission_error_message, has_execution_authority - logging_msg = "delete {} ifb device ({})".format(self.device, self.ifb_device) + logging_msg = f"delete {self.device} ifb device ({self.ifb_device})" with logging_context(logging_msg): verify_network_interface(self.ifb_device, self.__tc_command_output) diff --git a/test/common.py b/test/common.py index 069eac7..194ed63 100644 --- a/test/common.py +++ b/test/common.py @@ -18,11 +18,11 @@ def print_test_result(expected, actual, error=None): if isinstance(expected, (dict, OrderedDict)): expected = json.dumps(expected, indent=4) - print("[expected]\n{}\n".format(expected)) + print(f"[expected]\n{expected}\n") if isinstance(actual, (dict, OrderedDict)): actual = json.dumps(actual, indent=4) - print("[actual]\n{}\n".format(actual)) + print(f"[actual]\n{actual}\n") if error: print(error, file=sys.stderr) @@ -31,7 +31,7 @@ def print_test_result(expected, actual, error=None): def is_invalid_param(rate, delay, packet_loss, packet_duplicate, corrupt, reordering): param_values = [packet_loss, packet_duplicate, corrupt, reordering] - print("rate={}, params={}".format(rate, param_values)) + print(f"rate={rate}, params={param_values}") is_invalid = all( [not RealNumber(param_value).is_type() or param_value <= 0 for param_value in param_values] @@ -52,7 +52,7 @@ def runner_helper(command): proc_runner = SubprocessRunner(command) proc_runner.run() - print("{}\n{}\n".format(proc_runner.command_str, proc_runner.stderr), file=sys.stderr) + print(f"{proc_runner.command_str}\n{proc_runner.stderr}\n", file=sys.stderr) assert proc_runner.returncode == 0 diff --git a/test/test_import_config.py b/test/test_import_config.py index 38366d7..1bb83b1 100644 --- a/test/test_import_config.py +++ b/test/test_import_config.py @@ -56,7 +56,7 @@ def test_smoke(self, tmpdir, device_value, overwrite): p = tmpdir.join("tcconfig.json") config = make_config(device_value) - print("[config]\n{}\n".format(config)) + print(f"[config]\n{config}\n") p.write(config) for device_option in [device_value]: @@ -85,7 +85,7 @@ def test_normal_tc_command(self, tmpdir, device_value): p = tmpdir.join("tcconfig.json") config = make_config(device_value) - print("[config]\n{}\n".format(config)) + print(f"[config]\n{config}\n") p.write(config) for device_option in [device_value]: diff --git a/test/test_iptables.py b/test/test_iptables.py index cfc8009..8393fdf 100644 --- a/test/test_iptables.py +++ b/test/test_iptables.py @@ -196,7 +196,7 @@ class Test_IptablesMangleMark_to_delete_command: "PREROUTING", "all", 1, - "{:s} -t mangle -D PREROUTING 1".format(get_iptables_base_command()), + f"{get_iptables_base_command():s} -t mangle -D PREROUTING 1", ], [ 20, @@ -205,7 +205,7 @@ class Test_IptablesMangleMark_to_delete_command: "OUTPUT", "all", 2, - "{:s} -t mangle -D OUTPUT 2".format(get_iptables_base_command()), + f"{get_iptables_base_command():s} -t mangle -D OUTPUT 2", ], ], ) @@ -306,7 +306,7 @@ def test_normal(self, iptables_ctrl_ipv4): assert iptables_ctrl_ipv4.add(mangle_mark) == 0 for lhs_mangle, rhs_mangle in zip(iptables_ctrl_ipv4.parse(), reverse_mangle_mark_list): - print("lhs: {}".format(lhs_mangle)) - print("rhs: {}".format(rhs_mangle)) + print(f"lhs: {lhs_mangle}") + print(f"rhs: {rhs_mangle}") assert lhs_mangle == rhs_mangle diff --git a/test/test_tcconfig.py b/test/test_tcconfig.py index b4f3414..db9f987 100644 --- a/test/test_tcconfig.py +++ b/test/test_tcconfig.py @@ -110,7 +110,7 @@ def test_smoke( if is_invalid_param(rate, delay, loss, corrupt): pytest.skip("skip null parameters") - for device_option in [device_value, "--device {}".format(device_value)]: + for device_option in [device_value, f"--device {device_value}"]: delete_all_rules(device_value) command = " ".join( @@ -129,7 +129,7 @@ def test_smoke( # is_enable_iptables, ] ) - print("command: {}".format(command)) + print(f"command: {command}") tcset_proc = SubprocessRunner(command) assert tcset_proc.run() == 0, tcset_proc.stderr diff --git a/test/test_tcdel.py b/test/test_tcdel.py index 3b65601..1de5aeb 100644 --- a/test/test_tcdel.py +++ b/test/test_tcdel.py @@ -118,7 +118,7 @@ def test_normal_ipv4(self, device_value): runner.run() expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": { @@ -180,7 +180,7 @@ def test_normal_ipv4(self, device_value): runner.run() expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": { @@ -218,7 +218,7 @@ def test_normal_ipv4(self, device_value): runner.run() expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": {}, @@ -325,7 +325,7 @@ def test_normal_ipv6(self, device_value): runner = SubprocessRunner(tcshow_cmd + ["--ipv6"]) expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": { @@ -390,7 +390,7 @@ def test_normal_ipv6(self, device_value): runner.run() expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": { @@ -430,7 +430,7 @@ def test_normal_ipv6(self, device_value): runner.run() expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": {}, diff --git a/test/test_tcset_change.py b/test/test_tcset_change.py index 5368762..afa8d40 100644 --- a/test/test_tcset_change.py +++ b/test/test_tcset_change.py @@ -34,7 +34,7 @@ def test_smoke_multiple(self, device_value): if device_value is None: pytest.skip("device is null") - for device_option in [device_value, "--device {}".format(device_value)]: + for device_option in [device_value, f"--device {device_value}"]: delete_all_rules(device_option) runner_helper( @@ -62,7 +62,7 @@ def test_normal(self, device_value): if device_value is None: pytest.skip("device is null") - for device_option in [device_value, "--device {}".format(device_value)]: + for device_option in [device_value, f"--device {device_value}"]: runner_helper( " ".join( [ @@ -89,10 +89,10 @@ def test_normal(self, device_value): ) ) - runner = SubprocessRunner("{:s} {:s}".format(Tc.Command.TCSHOW, device_option)) + runner = SubprocessRunner(f"{Tc.Command.TCSHOW:s} {device_option:s}") expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": { @@ -136,10 +136,10 @@ def test_normal(self, device_value): ) ) - runner = SubprocessRunner("{:s} {:s}".format(Tc.Command.TCSHOW, device_option)) + runner = SubprocessRunner(f"{Tc.Command.TCSHOW:s} {device_option:s}") expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": { diff --git a/test/test_tcset_dst_one.py b/test/test_tcset_dst_one.py index 9e7c861..a0d7e5e 100644 --- a/test/test_tcset_dst_one.py +++ b/test/test_tcset_dst_one.py @@ -84,8 +84,8 @@ def test_dst_net_uniform_latency( [ Tc.Command.TCSET, tc_target, - "--delay {}ms".format(delay), - "--shaping-algo {:s}".format(shaping_algo), + f"--delay {delay}ms", + f"--shaping-algo {shaping_algo:s}", ] ) ) @@ -132,9 +132,9 @@ def test_dst_net_latency_distro( Tc.Command.TCSET, tc_target, "--delay", - "{:d}ms".format(delay), + f"{delay:d}ms", "--delay-distro", - "{:d}ms".format(delay_distro), + f"{delay_distro:d}ms", ] ) ) @@ -202,9 +202,7 @@ def test_dst_net_packet_loss( without_tc_loss_rate = pingparser.parse(ping_result).packet_loss_rate # w/ traffic shaping --- - runner_helper( - " ".join([Tc.Command.TCSET, tc_target, "{:s} {:f}".format(option, value)]) - ) + runner_helper(" ".join([Tc.Command.TCSET, tc_target, f"{option:s} {value:f}"])) ping_result = transmitter.ping() assert ping_result.returncode == 0 @@ -234,7 +232,7 @@ def test_dst_net_packet_duplicate( without_tc_duplicate_rate = pingparser.parse(ping_result).packet_duplicate_rate # w/ packet duplicate tc --- - runner_helper(" ".join([Tc.Command.TCSET, tc_target, "{:s} {}".format(option, value)])) + runner_helper(" ".join([Tc.Command.TCSET, tc_target, f"{option:s} {value}"])) ping_result = transmitter.ping() assert ping_result.returncode == 0 @@ -262,7 +260,7 @@ def test_dst_net_exclude_dst_network( transmitter.destination = dst_host_option # w/ latency tc --- - runner_helper([Tc.Command.TCSET, tc_target, "--delay", "{:d}ms".format(delay)]) + runner_helper([Tc.Command.TCSET, tc_target, "--delay", f"{delay:d}ms"]) ping_result = transmitter.ping() assert ping_result.returncode == 0 @@ -277,7 +275,7 @@ def test_dst_net_exclude_dst_network( "--exclude-dst-network {:s}/24".format( ".".join(dst_host_option.split(".")[:3] + ["0"]) ), - "--delay {:d}ms".format(delay), + f"--delay {delay:d}ms", "--overwrite", ] ) diff --git a/test/test_tcset_dst_two.py b/test/test_tcset_dst_two.py index 88f9772..fcad90e 100644 --- a/test/test_tcset_dst_two.py +++ b/test/test_tcset_dst_two.py @@ -82,9 +82,9 @@ def test_network( command_list = [ Tc.Command.TCSET, tc_target, - "--delay {:d}ms".format(delay), - "--dst-network {:s}".format(dst_host_ex_option), - "--shaping-algo {:s}".format(shaping_algo), + f"--delay {delay:d}ms", + f"--dst-network {dst_host_ex_option:s}", + f"--shaping-algo {shaping_algo:s}", ] assert SubprocessRunner(" ".join(command_list)).run() == 0 diff --git a/test/test_tcset_src_one.py b/test/test_tcset_src_one.py index 595aadb..24cbe64 100644 --- a/test/test_tcset_src_one.py +++ b/test/test_tcset_src_one.py @@ -77,7 +77,7 @@ def test_src_net_uniform_latency( if typepy.is_null_string(dst_host_option): pytest.skip("destination host is null") - for tc_target in [device_option, "--device {}".format(device_option)]: + for tc_target in [device_option, f"--device {device_option}"]: delete_all_rules(tc_target) transmitter.destination = dst_host_option @@ -91,9 +91,9 @@ def test_src_net_uniform_latency( [ Tc.Command.TCSET, tc_target, - "--src-network {:s}".format(local_host_option), - "--delay {:d}ms".format(delay), - "--shaping-algo {:s}".format(shaping_algo), + f"--src-network {local_host_option:s}", + f"--delay {delay:d}ms", + f"--shaping-algo {shaping_algo:s}", ] ) ).run() @@ -105,8 +105,8 @@ def test_src_net_uniform_latency( # assertion --- rtt_diff = with_tc_rtt_avg - without_tc_rtt_avg - print("w/o tc rtt: {} ms".format(without_tc_rtt_avg)) - print("w/ tc rtt: {} ms".format(with_tc_rtt_avg)) + print(f"w/o tc rtt: {without_tc_rtt_avg} ms") + print(f"w/ tc rtt: {with_tc_rtt_avg} ms") assert rtt_diff > (delay * ASSERT_MARGIN) @@ -147,7 +147,7 @@ def test_src_net_uniform_latency( if typepy.is_null_string(dst_host_option): pytest.skip("destination host is null") - for tc_target in [device_option, "--device {}".format(device_option)]: + for tc_target in [device_option, f"--device {device_option}"]: delete_all_rules(tc_target) transmitter.destination = dst_host_option @@ -161,11 +161,11 @@ def test_src_net_uniform_latency( [ Tc.Command.TCSET, tc_target, - "--direction {:s}".format(TrafficDirection.INCOMING), - "--exclude-dst-network {:s}".format(local_host_option), - "--exclude-src-network {:s}".format(dst_host_option), - "--delay {:d}ms".format(delay), - "--shaping-algo {:s}".format(shaping_algo), + f"--direction {TrafficDirection.INCOMING:s}", + f"--exclude-dst-network {local_host_option:s}", + f"--exclude-src-network {dst_host_option:s}", + f"--delay {delay:d}ms", + f"--shaping-algo {shaping_algo:s}", ] ) ).run() @@ -177,8 +177,8 @@ def test_src_net_uniform_latency( # assertion --- rtt_diff = exclude_tc_rtt_avg - without_tc_rtt_avg - print("w/o tc rtt: {} ms".format(without_tc_rtt_avg)) - print("exclude tc rtt: {} ms".format(exclude_tc_rtt_avg)) + print(f"w/o tc rtt: {without_tc_rtt_avg} ms") + print(f"exclude tc rtt: {exclude_tc_rtt_avg} ms") assert rtt_diff < (delay / 10) diff --git a/test/test_tcshow.py b/test/test_tcshow.py index 3f580a3..843b6cd 100644 --- a/test/test_tcshow.py +++ b/test/test_tcshow.py @@ -30,14 +30,14 @@ def test_normal_empty(self, device_value): if device_value is None: pytest.skip("device option is null") - for tc_target in [device_value, "--device {}".format(device_value)]: + for tc_target in [device_value, f"--device {device_value}"]: delete_all_rules(tc_target) base_commands = [Tc.Command.TCSHOW, tc_target] runner = SubprocessRunner(" ".join(base_commands)) expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": { @@ -67,7 +67,7 @@ def test_normal_ipv4(self, device_value): if device_value is None: pytest.skip("device option is null") - for tc_target in [device_value, "--device {}".format(device_value)]: + for tc_target in [device_value, f"--device {device_value}"]: runner_helper( " ".join( [ @@ -156,7 +156,7 @@ def test_normal_ipv4(self, device_value): runner = SubprocessRunner(" ".join([Tc.Command.TCSHOW, tc_target])) expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": { @@ -210,7 +210,7 @@ def test_normal_ipv6(self, device_value): if device_value is None: pytest.skip("device option is null") - for tc_target in [device_value, "--device {}".format(device_value)]: + for tc_target in [device_value, f"--device {device_value}"]: runner_helper( " ".join( [ @@ -302,7 +302,7 @@ def test_normal_ipv6(self, device_value): runner = SubprocessRunner(" ".join([Tc.Command.TCSHOW, tc_target, "--ipv6"])) expected = ( "{" - + '"{:s}"'.format(device_value) + + f'"{device_value:s}"' + ": {" + """ "outgoing": {