-
Notifications
You must be signed in to change notification settings - Fork 866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KIP-430] DescribeConsumerGroups, DescribeTopics, DescribeCluster with authorized AclOperations #2021
[KIP-430] DescribeConsumerGroups, DescribeTopics, DescribeCluster with authorized AclOperations #2021
Changes from 13 commits
f9b72d2
6ff4931
c8c44f8
2b7e3ed
7874688
9545b10
82ace2a
ea33890
4dfcd5e
06b23bf
2ff8a56
43f20a0
3c33848
3f233b9
b4495d2
b08b0f7
9beae2d
c0a7b18
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -1,4 +1,5 @@ | ||||||||
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider | ||||||||
// Copyright 2015-2016 Andreas Heider, | ||||||||
// 2016-2023 Confluent Inc. | ||||||||
// | ||||||||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||
// you may not use this file except in compliance with the License. | ||||||||
|
@@ -546,23 +547,52 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm | |||||||
|
||||||||
static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] commandArgs) | ||||||||
{ | ||||||||
if (commandArgs.Length < 1) | ||||||||
if (commandArgs.Length < 3) | ||||||||
{ | ||||||||
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <group1> [<group2 ... <groupN>]"); | ||||||||
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <username> <password> <include_authorized_operations> <group1> [<group2 ... <groupN>]"); | ||||||||
Environment.ExitCode = 1; | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
var groupNames = commandArgs.ToList(); | ||||||||
var username = commandArgs[0]; | ||||||||
var password = commandArgs[1]; | ||||||||
var includeAuthorizedOperations = (commandArgs[2] == "1"); | ||||||||
var groupNames = commandArgs.Skip(3).ToList(); | ||||||||
|
||||||||
if (string.IsNullOrWhiteSpace(username)) | ||||||||
{ | ||||||||
username = null; | ||||||||
} | ||||||||
if (string.IsNullOrWhiteSpace(password)) | ||||||||
{ | ||||||||
password = null; | ||||||||
} | ||||||||
|
||||||||
var timeout = TimeSpan.FromSeconds(30); | ||||||||
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) | ||||||||
var config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
}; | ||||||||
if (username != null && password != null) | ||||||||
{ | ||||||||
config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
SecurityProtocol = SecurityProtocol.SaslPlaintext, | ||||||||
SaslMechanism = SaslMechanism.Plain, | ||||||||
SaslUsername = username, | ||||||||
SaslPassword = password, | ||||||||
}; | ||||||||
} | ||||||||
|
||||||||
using (var adminClient = new AdminClientBuilder(config).Build()) | ||||||||
{ | ||||||||
try | ||||||||
{ | ||||||||
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout }); | ||||||||
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations}); | ||||||||
foreach (var group in descResult.ConsumerGroupDescriptions) | ||||||||
{ | ||||||||
Console.WriteLine($" Group: {group.GroupId} {group.Error}"); | ||||||||
Console.WriteLine($"\n Group: {group.GroupId} {group.Error}"); | ||||||||
Console.WriteLine($" Broker: {group.Coordinator}"); | ||||||||
Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}"); | ||||||||
Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}"); | ||||||||
|
@@ -579,6 +609,10 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] | |||||||
} | ||||||||
Console.WriteLine($" TopicPartitions: [{topicPartitions}]"); | ||||||||
} | ||||||||
if(includeAuthorizedOperations){ | ||||||||
string operations = string.Join(" ", group.AuthorizedOperations); | ||||||||
Console.WriteLine($" Authorized operations: {operations}"); | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
catch (KafkaException e) | ||||||||
|
@@ -757,6 +791,153 @@ await adminClient.AlterUserScramCredentialsAsync(alterations, | |||||||
} | ||||||||
} | ||||||||
|
||||||||
static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations) | ||||||||
{ | ||||||||
foreach (var topic in topicDescriptions) | ||||||||
{ | ||||||||
Console.WriteLine($"\n Topic: {topic.Name} {topic.Error}"); | ||||||||
Console.WriteLine($" Partitions:"); | ||||||||
foreach (var partition in topic.Partitions) | ||||||||
{ | ||||||||
Console.WriteLine($" Partition ID: {partition.Partition} with leader: {partition.Leader}"); | ||||||||
if(!partition.ISR.Any()){ | ||||||||
Console.WriteLine(" There is no In-Sync-Replica broker for the partition"); | ||||||||
} | ||||||||
else{ | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
string isrs = string.Join("; ", partition.ISR); | ||||||||
Console.WriteLine($" The In-Sync-Replica brokers are: {isrs}"); | ||||||||
} | ||||||||
|
||||||||
if(!partition.Replicas.Any()){ | ||||||||
Console.WriteLine(" There is no Replica broker for the partition"); | ||||||||
} | ||||||||
else{ | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these are nits but
Suggested change
|
||||||||
string replicas = string.Join("; ", partition.Replicas); | ||||||||
Console.WriteLine($" The Replica brokers are: {replicas}"); | ||||||||
} | ||||||||
|
||||||||
} | ||||||||
Console.WriteLine($" Is internal: {topic.IsInternal}"); | ||||||||
if(includeAuthorizedOperations){ | ||||||||
string operations = string.Join(" ", topic.AuthorizedOperations); | ||||||||
Console.WriteLine($" Authorized operations: {operations}"); | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
static async Task DescribeTopicsAsync(string bootstrapServers, string[] commandArgs) { | ||||||||
if (commandArgs.Length < 3) | ||||||||
{ | ||||||||
Console.WriteLine("usage: .. <bootstrapServers> describe-topics <username> <password> <include_authorized_operations> <topic1> [<topic2 ... <topicN>]"); | ||||||||
Environment.ExitCode = 1; | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
var username = commandArgs[0]; | ||||||||
var password = commandArgs[1]; | ||||||||
var includeAuthorizedOperations = (commandArgs[2] == "1"); | ||||||||
if (string.IsNullOrWhiteSpace(username)) | ||||||||
{ | ||||||||
username = null; | ||||||||
} | ||||||||
if (string.IsNullOrWhiteSpace(password)) | ||||||||
{ | ||||||||
password = null; | ||||||||
} | ||||||||
var topicNames = commandArgs.Skip(3).ToList(); | ||||||||
|
||||||||
var timeout = TimeSpan.FromSeconds(30); | ||||||||
var config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
}; | ||||||||
if (username != null && password != null) | ||||||||
{ | ||||||||
config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
SecurityProtocol = SecurityProtocol.SaslPlaintext, | ||||||||
SaslMechanism = SaslMechanism.Plain, | ||||||||
SaslUsername = username, | ||||||||
SaslPassword = password, | ||||||||
}; | ||||||||
} | ||||||||
|
||||||||
using (var adminClient = new AdminClientBuilder(config).Build()) | ||||||||
{ | ||||||||
try | ||||||||
{ | ||||||||
var descResult = await adminClient.DescribeTopicsAsync( | ||||||||
TopicCollection.OfTopicNames(topicNames), | ||||||||
new DescribeTopicsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations}); | ||||||||
PrintTopicDescriptions(descResult.TopicDescriptions, includeAuthorizedOperations); | ||||||||
} | ||||||||
catch (DescribeTopicsException e) | ||||||||
{ | ||||||||
// At least one TopicDescription will have an error. | ||||||||
PrintTopicDescriptions(e.Results.TopicDescriptions, includeAuthorizedOperations); | ||||||||
} | ||||||||
catch (KafkaException e) | ||||||||
{ | ||||||||
Console.WriteLine($"An error occurred describing topics: {e}"); | ||||||||
Environment.ExitCode = 1; | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
static async Task DescribeClusterAsync(string bootstrapServers, string[] commandArgs) { | ||||||||
if (commandArgs.Length < 3) | ||||||||
{ | ||||||||
Console.WriteLine("usage: .. <bootstrapServers> describe-cluster <username> <password> <include_authorized_operations>"); | ||||||||
Environment.ExitCode = 1; | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
var username = commandArgs[0]; | ||||||||
var password = commandArgs[1]; | ||||||||
var includeAuthorizedOperations = (commandArgs[2] == "1"); | ||||||||
|
||||||||
var timeout = TimeSpan.FromSeconds(30); | ||||||||
var config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
}; | ||||||||
if (username != null && password != null) | ||||||||
{ | ||||||||
config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
SecurityProtocol = SecurityProtocol.SaslPlaintext, | ||||||||
SaslMechanism = SaslMechanism.Plain, | ||||||||
SaslUsername = username, | ||||||||
SaslPassword = password, | ||||||||
}; | ||||||||
} | ||||||||
|
||||||||
using (var adminClient = new AdminClientBuilder(config).Build()) | ||||||||
{ | ||||||||
try | ||||||||
{ | ||||||||
var descResult = await adminClient.DescribeClusterAsync(new DescribeClusterOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations}); | ||||||||
|
||||||||
Console.WriteLine($" Cluster Id: {descResult.ClusterId}\n Controller: {descResult.Controller}"); | ||||||||
Console.WriteLine(" Nodes:"); | ||||||||
foreach(var node in descResult.Nodes){ | ||||||||
Console.WriteLine($" {node}"); | ||||||||
} | ||||||||
if(includeAuthorizedOperations){ | ||||||||
string operations = string.Join(" ", descResult.AuthorizedOperations); | ||||||||
Console.WriteLine($" Authorized operations: {operations}"); | ||||||||
} | ||||||||
} | ||||||||
catch (KafkaException e) | ||||||||
{ | ||||||||
Console.WriteLine($"An error occurred describing cluster: {e}"); | ||||||||
Environment.ExitCode = 1; | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
public static async Task Main(string[] args) | ||||||||
{ | ||||||||
if (args.Length < 2) | ||||||||
|
@@ -768,8 +949,8 @@ public static async Task Main(string[] args) | |||||||
"list-consumer-groups", "describe-consumer-groups", | ||||||||
"list-consumer-group-offsets", "alter-consumer-group-offsets", | ||||||||
"incremental-alter-configs", "describe-user-scram-credentials", | ||||||||
"alter-user-scram-credentials" | ||||||||
|
||||||||
"alter-user-scram-credentials", "describe-topics", | ||||||||
"describe-cluster" | ||||||||
}) + | ||||||||
" .."); | ||||||||
Environment.ExitCode = 1; | ||||||||
|
@@ -824,6 +1005,12 @@ public static async Task Main(string[] args) | |||||||
case "alter-user-scram-credentials": | ||||||||
await AlterUserScramCredentialsAsync(bootstrapServers, commandArgs); | ||||||||
break; | ||||||||
case "describe-topics": | ||||||||
await DescribeTopicsAsync(bootstrapServers, commandArgs); | ||||||||
break; | ||||||||
case "describe-cluster": | ||||||||
await DescribeClusterAsync(bootstrapServers, commandArgs); | ||||||||
break; | ||||||||
default: | ||||||||
Console.WriteLine($"unknown command: {command}"); | ||||||||
break; | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
// Copyright 2022 Confluent Inc. | ||
// Copyright 2022-2023 Confluent Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
|
@@ -15,6 +15,8 @@ | |
// Refer to LICENSE for more information. | ||
|
||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Text; | ||
|
||
namespace Confluent.Kafka.Admin | ||
{ | ||
|
@@ -58,5 +60,38 @@ public class ConsumerGroupDescription | |
/// Members list. | ||
/// </summary> | ||
public List<MemberDescription> Members { get; set; } | ||
|
||
/// <summary> | ||
/// AclOperation list. | ||
/// </summary> | ||
public List<AclOperation> AuthorizedOperations { get; set; } | ||
|
||
/// <summary> | ||
/// Returns a JSON representation of this object. | ||
/// </summary> | ||
/// <returns> | ||
/// A JSON representation of this object. | ||
/// </returns> | ||
public override string ToString() | ||
{ | ||
var result = new StringBuilder(); | ||
var members = string.Join(",", | ||
Members.Select(member => | ||
member.ToString() | ||
).ToList()); | ||
var authorizedOperations = string.Join(",", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add handling for if AuthorizedOperations is null |
||
AuthorizedOperations.Select(authorizedOperation => | ||
authorizedOperation.ToString().Quote() | ||
).ToList()); | ||
|
||
result.Append($"{{\"GroupId\": {GroupId.Quote()}"); | ||
result.Append($", \"Error\": \"{Error.Code}\", \"IsSimpleConsumerGroup\": {IsSimpleConsumerGroup.Quote()}"); | ||
result.Append($", \"PartitionAssignor\": {PartitionAssignor.Quote()}, \"State\": {State.ToString().Quote()}"); | ||
result.Append($", \"Coordinator\": {Coordinator?.ToString() ?? "null"}, \"Members\": [{members}]"); | ||
result.Append($", \"AuthorizedOperations\": [{authorizedOperations}]}}"); | ||
|
||
return result.ToString(); | ||
} | ||
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
// Copyright 2023 Confluent Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// | ||
// Refer to LICENSE for more information. | ||
|
||
using System; | ||
|
||
|
||
namespace Confluent.Kafka.Admin | ||
{ | ||
/// <summary> | ||
/// Options for the "IAdminClient.DescribeCluster" method. | ||
/// </summary> | ||
public class DescribeClusterOptions | ||
{ | ||
/// <summary> | ||
/// The overall request timeout, including broker lookup, request | ||
/// transmission, operation time on broker, and response. If set | ||
/// to null, the default request timeout for the AdminClient will | ||
/// be used. | ||
/// | ||
/// Default: null | ||
/// </summary> | ||
public TimeSpan? RequestTimeout { get; set; } | ||
|
||
/// <summary> | ||
/// Decides if the broker should return cluster authorized operations. | ||
/// | ||
/// Default: false | ||
/// </summary> | ||
public bool IncludeAuthorizedOperations { get; set; } = false; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: stick to the formatting norms of the brace on next line, here and elsewhere in this file