Skip to content

Commit

Permalink
tests/rpk: refactored parsing group describe
Browse files Browse the repository at this point in the history
Refactored `group_describe` to use `parse_rpk_table`

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Oct 5, 2023
1 parent 0459a51 commit 15db057
Showing 1 changed file with 105 additions and 87 deletions.
192 changes: 105 additions & 87 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,71 +573,10 @@ def group_seek_to(self, group, to):
self._run_group(cmd)

def group_describe(self, group, summary=False):
def parse_field(field_name, string):
pattern = re.compile(f" *{field_name} +(?P<value>.+)")
m = pattern.match(string)
assert m is not None, f"Field string '{string}' does not match the pattern"
return m['value']

static_member_pattern = re.compile("^([^\s]+\s+){8}[^\s]+$")

partition_pattern_static_member = re.compile(
"(?P<topic>[^\s]+) +(?P<partition>\d+) +(?P<offset>\d+|-) +(?P<log_end>\d+|-) +(?P<lag>-?\d+|-) *(?P<member_id>[^\s]*)? *(?P<instance_id>[^\s]*) *(?P<client_id>[^\s]*)? *(?P<host>[^\s]*)?"
)
partition_pattern_dynamic_member = re.compile(
"(?P<topic>[^\s]+) +(?P<partition>\d+) +(?P<offset>\d+|-) +(?P<log_end>\d+|-) +(?P<lag>-?\d+|-) *(?P<member_id>[^\s]*)? *(?P<client_id>[^\s]*)? *(?P<host>[^\s]*)?"
)

def check_lines(lines):
for line in lines:
# UNKNOWN_TOPIC_OR_PARTITION: This server doesn't contain this partition or topic.
# We should wait until server will get information about it.
if line.find('UNKNOWN_TOPIC_OR_PARTITION') != -1:
return False

# Leadership movements are underway
if 'NOT_LEADER_FOR_PARTITION' in line:
return False

# Cluster not ready yet
if 'unknown broker' in line:
return False

if "missing from list offsets" in line:
return False

return True

def parse_partition(string):

pattern = partition_pattern_dynamic_member
if static_member_pattern.match(string):
pattern = partition_pattern_static_member
m = pattern.match(string)

# Check to see if info for the partition was queried during a change in leadership.
# if it was we'd expect to see a partition string ending in;
# NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
if m is None and string.find('NOT_LEADER_FOR_PARTITION') != -1:
return None

# Account for negative numbers and '-' value
all_digits = lambda x: x.lstrip('-').isdigit()

offset = int(m['offset']) if all_digits(m['offset']) else None
log_end = int(m['log_end']) if all_digits(m['log_end']) else None
lag = int(m['lag']) if all_digits(m['lag']) else None

return RpkGroupPartition(topic=m['topic'],
partition=int(m['partition']),
current_offset=offset,
log_end_offset=log_end,
lag=lag,
member_id=m['member_id'],
instance_id=m.groupdict().get(
'instance_id', None),
client_id=m['client_id'],
host=m['host'])
def try_parse_field(l: str):
m = re.match(f"^\s*(?P<name>[^\s]+) +(?P<value>[^\s]*)$", l)
return (m['name'].strip(),
m['value'].strip()) if m is not None else None

def try_describe_group(group):
if summary:
Expand Down Expand Up @@ -666,35 +605,114 @@ def try_describe_group(group):
raise

lines = out.splitlines()
group_name = None
coordinator = -1
state = None
balancer = None
members = None
total_lag = None
last_field = 0
for idx, l in enumerate(lines):
field = try_parse_field(l)

if field is None:
break
name, value = field

if name == "GROUP":
group_name = value
elif name == "COORDINATOR":
coordinator = int(value)
elif name == "STATE":
state = value
elif name == "BALANCER":
balancer = value
elif name == "MEMBERS":
members = int(value)
elif name == "TOTAL-LAG":
total_lag = int(value)
else:
assert False, f"unknown group describe field {name}={value}"
last_field = idx

if not check_lines(lines):
return None

group_name = parse_field("GROUP", lines[0])
coordinator = parse_field("COORDINATOR", lines[1])
state = parse_field("STATE", lines[2])
balancer = parse_field("BALANCER", lines[3])
members = parse_field("MEMBERS", lines[4])
total_lag = parse_field("TOTAL-LAG", lines[5])
partitions = []
for l in lines[6:]:
#skip header line
if l.startswith("TOPIC") or len(l) < 2:
continue
p = parse_partition(l)
# p is None if the leader of the partition has changed.
if p is None:
return None
table_lines = [l for l in lines[last_field + 1:] if l != ""]

if len(table_lines) == 0:
return RpkGroup(name=group_name,
coordinator=coordinator,
state=state,
balancer=balancer,
members=members,
total_lag=total_lag,
partitions=[])

table = parse_rpk_table('\n'.join(table_lines))

required_columns = {
"TOPIC",
"PARTITION",
"CURRENT-OFFSET",
"LOG-END-OFFSET",
"LAG",
}

optional_columns = {
"INSTANCE-ID",
"MEMBER-ID",
"CLIENT-ID",
"HOST",
}

for column in table.columns:
if column.name in required_columns:
required_columns.remove(column.name)
elif column.name in optional_columns:
optional_columns.remove(column.name)
else:
self._redpanda.logger.error(
f"Unexpected column: {column.name}")
raise RpkException(f"Unexpected column: {column.name}")

if len(required_columns) > 0:
raise RpkException(
f"Missing required columns: {required_columns}")

def int_or_none(value):
m = re.match("^-?\d+$", value)
if m:
return int(value)
return None

partitions.append(p)
def get_optional_column(row_dict, name):
return row_dict[name] if name in row_dict else None

for r in table.rows:
row_dict = {}
for column, field in zip([c.name for c in table.columns], r):
row_dict[column] = field

partitions.append(
RpkGroupPartition(
topic=row_dict["TOPIC"],
partition=int_or_none(row_dict["PARTITION"]),
current_offset=int_or_none(row_dict["CURRENT-OFFSET"]),
log_end_offset=int_or_none(row_dict["LOG-END-OFFSET"]),
lag=int_or_none(row_dict["LAG"]),
member_id=get_optional_column(row_dict, "MEMBER-ID"),
instance_id=get_optional_column(
row_dict, "INSTANCE-ID"),
client_id=get_optional_column(row_dict, "CLIENT-ID"),
host=get_optional_column(row_dict, "HOST"),
))

return RpkGroup(name=group_name,
coordinator=int(coordinator),
coordinator=coordinator,
state=state,
balancer=balancer,
members=int(members),
partitions=partitions,
total_lag=int(total_lag))
members=members,
total_lag=total_lag,
partitions=partitions)

attempts = 10
rpk_group = None
Expand Down

0 comments on commit 15db057

Please sign in to comment.