Skip to content

Commit

Permalink
Get /topics now retrieve partitions details
Browse files Browse the repository at this point in the history
  • Loading branch information
Vashnak authored and luphaz committed Oct 4, 2018
1 parent 7dd5d4d commit 71d9151
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.greencomnetworks.franzmanager.entities;

import org.apache.kafka.common.TopicPartitionInfo;

import java.util.stream.Collectors;

public class Partition {
private String topic;
private int partition;
Expand All @@ -21,6 +25,17 @@ public Partition(String topic, int partition, long beginningOffset, long endOffs
this.offlineReplicas = offlineReplicas;
}

public Partition(String topic, long beginningOffset, long endOffset, TopicPartitionInfo topicPartitionInfo, int[] offlineReplicas) {
this.topic = topic;
this.partition = topicPartitionInfo.partition();
this.beginningOffset = beginningOffset;
this.endOffset = endOffset;
this.leader = topicPartitionInfo.leader().id();
this.replicas = topicPartitionInfo.replicas().stream().mapToInt(replicaNode -> replicaNode.id()).toArray();
this.inSyncReplicas = topicPartitionInfo.isr().stream().mapToInt(isrNode -> isrNode.id()).toArray();
this.offlineReplicas = offlineReplicas;
}

public int getLeader() {
return leader;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,28 @@
package com.greencomnetworks.franzmanager.entities;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Map;

public class Topic {
public final String id;
@JsonInclude(JsonInclude.Include.NON_NULL)
public final Integer partitions;
@JsonInclude(JsonInclude.Include.NON_NULL)
public final Integer replications;
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<Partition> partitions;
public final Map<String, String> configurations;

@JsonCreator
public Topic(@JsonProperty(value="id", required=true) String id,
@JsonProperty("partitions") Integer partitions,
@JsonProperty("replication") Integer replications,
@JsonProperty("configurations") Map<String, String> configurations) {
public Topic(String id, List<Partition> partitions, Map<String, String> configurations) {
this.id = id;
this.partitions = partitions;
this.replications = replications;
this.configurations = configurations;
}

public Topic(String id,
Integer partitions,
Integer replications) {
this.id = id;
this.partitions = partitions;
this.replications = replications;
this.configurations = null;
}

public Topic(String id) {
this.id = id;
this.partitions = null;
this.replications = null;
this.configurations = null;
}

@Override
public String toString() {
return "Topic: " + id;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.greencomnetworks.franzmanager.entities;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Map;

public class TopicCreation {
public final String id;
@JsonInclude(JsonInclude.Include.NON_NULL)
public final Integer partitions;
@JsonInclude(JsonInclude.Include.NON_NULL)
public final Integer replications;
@JsonInclude(JsonInclude.Include.NON_NULL)
public final Map<String, String> configurations;

@JsonCreator
public TopicCreation(@JsonProperty(value = "id", required = true) String id,
@JsonProperty("partitions") Integer partitions,
@JsonProperty("replication") Integer replications,
@JsonProperty("configurations") Map<String, String> configurations) {
this.id = id;
this.partitions = partitions;
this.replications = replications;
this.configurations = configurations;
}

@Override
public String toString() {
return "Topic: " + id;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package com.greencomnetworks.franzmanager.resources;

import com.greencomnetworks.franzmanager.core.ConflictException;
import com.greencomnetworks.franzmanager.entities.Broker;
import com.greencomnetworks.franzmanager.entities.Cluster;
import com.greencomnetworks.franzmanager.entities.Partition;
import com.greencomnetworks.franzmanager.entities.Topic;
import com.greencomnetworks.franzmanager.entities.*;
import com.greencomnetworks.franzmanager.services.AdminClientService;
import com.greencomnetworks.franzmanager.services.ConstantsService;
import com.greencomnetworks.franzmanager.utils.FUtils;
Expand Down Expand Up @@ -40,16 +37,16 @@ public class TopicsResource {
Cluster cluster;
AdminClient adminClient;

public TopicsResource(@HeaderParam("clusterId") String clusterId){
public TopicsResource(@HeaderParam("clusterId") String clusterId) {
this.clusterId = clusterId;
this.adminClient = AdminClientService.getAdminClient(this.clusterId);
for (Cluster cluster : ConstantsService.clusters) {
if(StringUtils.equals(cluster.name, clusterId)){
if (StringUtils.equals(cluster.name, clusterId)) {
this.cluster = cluster;
break;
}
}
if(this.cluster == null){
if (this.cluster == null) {
throw new NotFoundException("Cluster not found for id " + clusterId);
}
}
Expand Down Expand Up @@ -84,14 +81,16 @@ public List<Topic> getTopics(@QueryParam("idOnly") boolean idOnly, @QueryParam("
TopicDescription describedTopic = describedTopics.get(topicName);

Map<String, String> configurations = null;
if(!shortVersion) {
if (!shortVersion) {
configurations = entry.getValue().entries().stream()
.collect(Collectors.toMap(
ConfigEntry::name,
ConfigEntry::value
));
}
return new Topic(topicName, describedTopic.partitions().size(), describedTopic.partitions().get(0).replicas().size(), configurations);
List<Partition> topicPartitions = describedTopic.partitions().stream()
.map(topicPartitionInfo -> new Partition(topicName, -1, -1, topicPartitionInfo, null)).collect(Collectors.toList());
return new Topic(topicName, topicPartitions, configurations);
}).collect(Collectors.toList());

return completeTopics;
Expand All @@ -103,7 +102,7 @@ public List<Topic> getTopics(@QueryParam("idOnly") boolean idOnly, @QueryParam("
}

@POST
public Response createTopic(Topic topic) {
public Response createTopic(TopicCreation topic) {
if (topicExist(topic.id)) {
throw new ConflictException("This topic (" + topic.id + ") already exist.");
}
Expand Down Expand Up @@ -147,13 +146,15 @@ public Topic getTopic(@PathParam("topicId") String topicId) {
ConfigEntry::value
));

return new Topic(topicId, topicDescription.partitions().size(), topicDescription.partitions().get(0).replicas().size(), configurations);
List<Partition> topicPartitions = topicDescription.partitions().stream()
.map(topicPartitionInfo -> new Partition(topicId, -1, -1, topicPartitionInfo, null)).collect(Collectors.toList());
return new Topic(topicId, topicPartitions, configurations);
}

@PUT
@Path("/{topicId}")
public Response updateTopicConfig(@PathParam("topicId") String topicId, Map<String, String> configurations) {
if(!topicExist(topicId)) {
if (!topicExist(topicId)) {
throw new NotFoundException("This topic (" + topicId + ") doesn't exist.");
}

Expand Down Expand Up @@ -193,7 +194,7 @@ public Response deleteTopic(@PathParam("topicId") String topicId) {
@GET
@Path("/{topicId}/partitions")
public List<Partition> getTopicPartitions(@PathParam("topicId") String topicId) {
if(!topicExist(topicId)) {
if (!topicExist(topicId)) {
throw new NotFoundException("This topic (" + topicId + ") doesn't exist.");
}

Expand All @@ -203,7 +204,7 @@ public List<Partition> getTopicPartitions(@PathParam("topicId") String topicId)
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(config, deserializer, deserializer);
try {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicId);
List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi->new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
Map<TopicPartition, Long> offsetsBeginning = consumer.beginningOffsets(topicPartitions);
Map<TopicPartition, Long> offsetsEnd = consumer.endOffsets(topicPartitions);

Expand All @@ -221,12 +222,12 @@ public List<Partition> getTopicPartitions(@PathParam("topicId") String topicId)
@Path("/{topicId}/partitions")
public Response postTopicPartitions(@PathParam("topicId") String topicId, @QueryParam("quantity") Integer quantity) {
TopicDescription topicDescription = KafkaUtils.describeTopic(adminClient, topicId);
if(topicDescription == null) {
if (topicDescription == null) {
throw new NotFoundException("This topic (" + topicId + ") doesn't exist.");
}

try {
Map<String, NewPartitions> newPartitions = FUtils.Map.of(topicId, NewPartitions.increaseTo(topicDescription.partitions().size() + quantity));
Map<String, NewPartitions> newPartitions = FUtils.Map.of(topicId, NewPartitions.increaseTo(topicDescription.partitions().size() + quantity));
adminClient.createPartitions(newPartitions).all().get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
Expand Down

0 comments on commit 71d9151

Please sign in to comment.