Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DP-1762] - topicctl get actions for under replicated and offline partitions #149

Closed

Conversation

ssingudasu
Copy link
Contributor

@ssingudasu ssingudasu commented Sep 15, 2023

Description

This PR is to add new actions for topicctl get

Expectations:

  • topicctl get under-replicated-partitions (optional --topics flag)
  • topicctl get offline-partitions (optional --topics flag)
  • Fix the debug PersistentPreRunE: getPreRun in topicctl get since topicctl get is overriding the root cmd PersistentPreRunE: preRun

under replicated partitions

  • Kafka cluster is in under replicated state if the number of ISR are less than the Replicas available for the partition
  • topicctl already has readily available metadata call with filtered information. We will leverage topicctl admin pkg for under-replicated-partitions

offline partitions

  • topicctl has admin.GetTopics which performs kafka Metadata call, there is a lot of formatting and missing information in admin.GetTopics. Hence we will perform the kafka-go client.Metadata to ensure we capture all the offline partition information.
  • We leverage kafka-go metadata for offline-partitions

kafka-go metadata call for offline partitions gives the Leader partition information as below:

kafka "github.com/segmentio/kafka-go"

partition:

{
	Topic: topic_name
	ID: partition_id
	Leader: kafka.Broker (refer below)
	Replicas: [][Broker
	Isr:  [][Broker
	Error: kafka.Error
}

partition.Leader Broker response:

kafka.Broker{
        Host: ""
	Port: 0
	ID: 0
	Rack: ""
}

Unlike sarama which gives the offline leader broker id as -1, kafka-go gives the offline broker id as 0

Hence we will check the below error code: ListenerNotFound along with broker.Host as "" and broker.Port as 0 for partitions information in the metadata response

Expectation: if kafka.Error is ListenerNotFound and there is no broker or port found for leader partition, there by confirming that the partition is offline. We will display the ISR.ID and Repicas.ID with information from the kafka-go metadata call (i.e: each offline partition - Isr.ID and Replicas.ID as 0)

@ssingudasu ssingudasu requested a review from a team as a code owner September 15, 2023 04:24
Copy link
Contributor

@petedannemann petedannemann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start!

func init() {
log.SetFormatter(&prefixed.TextFormatter{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log format and the debug flag should already be getting set by the root command that these subcommands are added to

RootCmd.PersistentFlags().BoolVar(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, we are overriding them, nevermind

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove some duplicate code by adding this to the get subcommands prerun

rootCmd.PersistentPreRun(cmd, args)

spf13/cobra#252 (comment)

urpCommand := &cobra.Command{
Use: "under-replicated-partitions",
Short: "Fetch all under replicated partitions in the kafka cluster.",
Args: cobra.MinimumNArgs(0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want cobra.NoArgs, right?

opCommand := &cobra.Command{
Use: "offline-partitions",
Short: "Fetch all offline partitions in the kafka cluster.",
Args: cobra.MinimumNArgs(0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want cobra.NoArgs, right?


type TopicOPsInfo struct {
Name string `json:"name"`
Partitions []kafka.Partition `json:"partitions"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be converting []kafka.Partition into []PartitionInfo for consistency

c.startSpinner()
urpsFound := false

filterTopics := make(map[string]bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this file we usually put all the business logic into a method on the adminClient. This allows us to have different implementations for Kafka and Zookeeper. I think this would lead to some strange errors if someone tried to use this with Zookeeper right now. If we move this into a method on the adminClient we have ways to mark methods as unsupported and there will be consistent error messaging around that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it would look something like this for Kafka. The Zookeeper implementation would return nil and we'd find some way to mark this as unsupported using our GetSupportedFeatures

@petedannemann
Copy link
Contributor

Could we document these new commands in the README with the other get commands?

@@ -136,6 +137,16 @@ type zkChangeNotification struct {
EntityPath string `json:"entity_path"`
}

type TopicURPsInfo struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about it, do we even need these new types? Couldn't we just reuse TopicInfo and leave the unused fields blank?

Comment on lines +35 to +45
type urpCmdConfig struct {
topics []string
}

var urpConfig urpCmdConfig

type opCmdConfig struct {
topics []string
}

var opConfig urpCmdConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using abbreviations for under-replicated-partitions and offline-partitions breaks the pattern used everywhere else. Even within these two new commands, it's not consistent: you have urpCmdConfg, underReplicatedPartitionsCmd, GetUnderReplicatedPartitions, urpCommand, FormatURPs, etc.

I realize they are long but I think it's better not to use abbreviations and be more consistent.

@@ -44,6 +62,12 @@ func init() {
false,
"Sort by value instead of name; only applies for lags at the moment",
)
getCmd.PersistentFlags().BoolVar(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--debug is noted as being a "global flag". Isn't this already set somewhere higher up the stack?

func underReplicatedPartitionsCmd() *cobra.Command {
urpCommand := &cobra.Command{
Use: "under-replicated-partitions",
Short: "Fetch all under replicated partitions in the kafka cluster.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The term "under-replicated partitions", when output, is inconsistently written. I would suggest "under-replicated partitions" (with the hyphen between under and replicated, and not capitalized) because that is most grammatically correct IMO.

"topics",
"t",
[]string{},
"under replicated partitions for the topics",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

under-replicated partitions

c.printer("Under Replicated Partitions:\n%s", admin.FormatURPs(allTopicURPs))

if urpsFound {
return fmt.Errorf("Topics have Under Replicated Partitions")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

under-replicated partitions


c.printer("Under Replicated Partitions:\n%s", admin.FormatURPs(allTopicURPs))

if urpsFound {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is urpsFound necessary? Can't you use len(allTopicURPs)>0?


c.printer("Offline Partitions:\n%s", admin.FormatOPs(allTopicOPs))

if opsFound {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, is opsFound necessary versus len(allTopicOPs)>0?

// If ListenerNotFound Error occurs for leader partition
// this means the partition is offline
var listenerNotFoundError kafka.Error
listenerNotFoundError = 72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange to see a hard coded value here. Can't it be referenced by name?

@ssingudasu
Copy link
Contributor Author

Closing this PR as we have decided to implement the same in

# topicctl get partitions

@ssingudasu ssingudasu closed this Sep 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants