Skip to content

Commit

Permalink
Resolve #25: Quotas implementation, only quotas based in user principal
Browse files Browse the repository at this point in the history
  • Loading branch information
purbon authored and Marcos Martinez Cedillo committed Nov 2, 2021
1 parent b4171b3 commit bf8212a
Show file tree
Hide file tree
Showing 11 changed files with 626 additions and 2 deletions.
70 changes: 70 additions & 0 deletions src/main/java/com/purbon/kafka/topology/model/users/Quota.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.purbon.kafka.topology.model.users;

import java.util.Optional;

public class Quota {

private String principal;
private Optional<Double> producer_byte_rate;
private Optional<Double> consumer_byte_rate;
private Optional<Double> request_percentage;

public Quota() {
this.principal = null;
producer_byte_rate = Optional.empty();
consumer_byte_rate = Optional.empty();
request_percentage = Optional.empty();
}

public Quota(
String principal,
Optional<Double> producer_byte_rate,
Optional<Double> consumer_byte_rate,
Optional<Double> request_percentage) {
this();
this.principal = principal;
this.producer_byte_rate = producer_byte_rate;
this.consumer_byte_rate = consumer_byte_rate;
this.request_percentage = request_percentage;
}

public Quota(
String principal, Optional<Double> producer_byte_rate, Optional<Double> consumer_byte_rate) {
this();
this.principal = principal;
this.producer_byte_rate = producer_byte_rate;
this.consumer_byte_rate = consumer_byte_rate;
}

public String getPrincipal() {
return principal;
}

public void setPrincipal(String principal) {
this.principal = principal;
}

public Optional<Double> getProducer_byte_rate() {
return producer_byte_rate;
}

public void setProducer_byte_rate(Optional<Double> producer_byte_rate) {
this.producer_byte_rate = producer_byte_rate;
}

public Optional<Double> getConsumer_byte_rate() {
return consumer_byte_rate;
}

public void setConsumer_byte_rate(Optional<Double> consumer_byte_rate) {
this.consumer_byte_rate = consumer_byte_rate;
}

public Optional<Double> getRequest_percentage() {
return request_percentage;
}

public void setRequest_percentage(Optional<Double> request_percentage) {
this.request_percentage = request_percentage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.purbon.kafka.topology.quotas;

import com.purbon.kafka.topology.model.users.Quota;
import java.util.*;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;

public class QuotasClientBindingsBuilder {

private static final String CONSUMER_BYTE_RATE = "consumer_byte_rate";
private static final String PRODUCER_BYTE_RATE = "producer_byte_rate";
private static final String REQUEST_RATE = "request_percentage";

Quota quota = null;

public QuotasClientBindingsBuilder(Quota quota) {
this.quota = quota;
}

private ClientQuotaEntity buildClient() {
Map<String, String> entries = new HashMap<>();
entries.put(ClientQuotaEntity.USER, null);
if (!quota.getPrincipal().isEmpty()) {
entries.put(ClientQuotaEntity.USER, quota.getPrincipal());
}
return new ClientQuotaEntity(entries);
}

private ClientQuotaAlteration.Op addConsumer() {
return new ClientQuotaAlteration.Op(
CONSUMER_BYTE_RATE, quota.getConsumer_byte_rate().orElse(null));
}

private ClientQuotaAlteration.Op addProducer() {
return new ClientQuotaAlteration.Op(
PRODUCER_BYTE_RATE, quota.getProducer_byte_rate().orElse(null));
}

private ClientQuotaAlteration.Op addRequestRate() {
return new ClientQuotaAlteration.Op(REQUEST_RATE, quota.getRequest_percentage().orElse(null));
}

public ClientQuotaAlteration build() {
ClientQuotaEntity entityQuota = buildClient();
List ops = new ArrayList();
CollectNotNull.addSafe(ops, addConsumer(), addProducer(), addRequestRate());
return new ClientQuotaAlteration(entityQuota, ops);
}

public static final class CollectNotNull {
public static <T> boolean addSafe(List<T> list, T... elements) {
if (list == null) return false;
for (T element : elements) {
if (element != null) list.add(element);
}
return true;
}
}
}
58 changes: 58 additions & 0 deletions src/main/java/com/purbon/kafka/topology/quotas/QuotasManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.purbon.kafka.topology.quotas;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.model.User;
import com.purbon.kafka.topology.model.users.Quota;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class QuotasManager {

private static final Logger LOGGER = LogManager.getLogger(QuotasManager.class);

private final AdminClient adminClient;
// private final TopologyBuilderAdminClient adminClient;
private final Configuration config;

public QuotasManager(AdminClient adminClient, Configuration config) {
this.adminClient = adminClient;
this.config = config;
}

public void assignQuotasPrincipal(Collection<Quota> quotas) {
List<ClientQuotaAlteration> lstQuotasAlteration =
quotas.stream()
.map(f -> new QuotasClientBindingsBuilder(f).build())
.collect(Collectors.toList());

this.adminClient.alterClientQuotas(lstQuotasAlteration).all();
}

public void removeQuotasPrincipal(Collection<User> users) {
List<ClientQuotaAlteration> lstQuotasRemove =
users.stream()
.map(
f ->
new QuotasClientBindingsBuilder(
new Quota(
f.getPrincipal(),
Optional.empty(),
Optional.empty(),
Optional.empty()))
.build())
.collect(Collectors.toList());
this.adminClient.alterClientQuotas(lstQuotasRemove);
}

private void describeClientQuotas() throws ExecutionException, InterruptedException {
Map<ClientQuotaEntity, Map<String, Double>> quotasresult =
this.adminClient.describeClientQuotas(ClientQuotaFilter.all()).entities().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,42 @@ private Optional<PlatformSystem> doKSqlElements(JsonParser parser, JsonNode node
}

private Optional<PlatformSystem> doStreamsElements(JsonParser parser, JsonNode node)
throws JsonProcessingException {
throws IOException {
List<KStream> streams =
new JsonSerdesUtils<KStream>().parseApplicationUser(parser, node, KStream.class);
new JsonSerdesUtils<KStream>()
.parseApplicationUser(parser, node, KStream.class).stream()
.map(
ks -> {
ks.getTopics().putIfAbsent(KStream.READ_TOPICS, Collections.emptyList());
ks.getTopics().putIfAbsent(KStream.WRITE_TOPICS, Collections.emptyList());
return ks;
})
.collect(Collectors.toList());

for (KStream ks : streams) {
var topics = ks.getTopics();
if (topics.get(KStream.READ_TOPICS).isEmpty() || topics.get(KStream.WRITE_TOPICS).isEmpty()) {
LOGGER.warn(
"A Kafka Streams application with Id ("
+ ks.getApplicationId()
+ ") and Principal ("
+ ks.getPrincipal()
+ ")"
+ " might require both read and write topics as per its "
+ "nature it is always reading and writing into Apache Kafka, be aware if you notice problems.");
}
if (topics.get(KStream.READ_TOPICS).isEmpty()) {
// should have at minimum read topics defined as we could think of write topics as internal
// topics being
// auto created.
throw new IOException(
"Kafka Streams application with Id "
+ ks.getApplicationId()
+ " and principal "
+ ks.getPrincipal()
+ " have missing read topics. This field is required.");
}
}
return Optional.of(new PlatformSystem(streams));
}

Expand Down
21 changes: 21 additions & 0 deletions src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ public void testMetadata() {
.containsKey("system");
}

@Test
public void testStreamsParsingOnlyReadTopicsShouldNotParseAsNull() {
Topology topology =
parser.deserialise(TestUtils.getResourceFile("/descriptor-streams-only-read.yaml"));

Project p = topology.getProjects().get(0);

for (KStream s : p.getStreams()) {
assertThat(s.getTopics().get(KStream.READ_TOPICS)).isNotNull();
assertThat(s.getTopics().get(KStream.READ_TOPICS)).isInstanceOf(List.class);
assertThat(s.getTopics().get(KStream.READ_TOPICS)).contains("topicA", "topicB");
assertThat(s.getTopics().get(KStream.WRITE_TOPICS)).isNotNull();
assertThat(s.getTopics().get(KStream.WRITE_TOPICS)).isInstanceOf(List.class);
}
}

@Test(expected = TopologyParsingException.class)
public void testStreamsParsingOnlyWriteTopicsShoulRaiseAnException() {
parser.deserialise(TestUtils.getResourceFile("/descriptor-streams-only-write.yaml"));
}

@Test
public void testDynamicFirstLevelAttributes() {
Topology topology =
Expand Down
Loading

0 comments on commit bf8212a

Please sign in to comment.