Skip to content

Commit

Permalink
Quotas implementation, only quotas based in user principal (#376)
Browse files Browse the repository at this point in the history
* Resolve #25: Quotas implementation, only quotas based in user principal

* Add doc usage of quotas

Co-authored-by: Pere Urbón <purbon@users.noreply.github.com>
Co-authored-by: Marcos Martinez Cedillo <marcos.martinez@orangebank.es>
  • Loading branch information
3 people committed Dec 9, 2021
1 parent 1d4f8cd commit 236d015
Show file tree
Hide file tree
Showing 8 changed files with 620 additions and 0 deletions.
72 changes: 72 additions & 0 deletions docs/futures/use-of-quotas.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
Use of Quotas
*******************************

Define the use of quotas kafka config in JulieOps
Kafka quotas design can be read in the official doc in the next link: https://kafka.apache.org/documentation/#design_quotas

The quotas define a user consumption/producer byte limits or con be configured a request_rate network usage.
With this point can be modified the quotas usage with a GitOps culture.

Defining the quota usage
-----------

As an operator of Julie Ops you can define a set of custom plans, this would be a file that look like this:

.. code-block:: YAML
kafka:
quotas:
- principal: "User:App0"
producer_byte_rate: 1024
consumer_byte_rate: 1024
request_percentage: 50.0
- principal: "User:App1"
producer_byte_rate: 2048
consumer_byte_rate: 2048
- principal: "User:App2"
request_percentage: 80.0
Using quotas in the Topology
-----------

The quotas usage are defined to be relate with a user consumer or producer

.. code-block:: YAML
context: "contextOrg"
source: "source"
projects:
- name: "foo"
topics:
- name: "foo"
config:
replication.factor: "1"
num.partitions: "1"
consumers:
- principal: "User:App0"
- principal: "User:App1"
producers:
- principal: "User:App0"
- name: "fooBar"
config:
replication.factor: "1"
- name: "barFoo"
config:
replication.factor: "1"
- name: "barFooBar"
config:
replication.factor: "1"
platform:
kafka:
quotas:
- principal: "User:App0"
producer_byte_rate: 1024
consumer_byte_rate: 1024
request_percentage: 50.0
- principal: "User:App1"
producer_byte_rate: 2048
consumer_byte_rate: 2048
- principal: "User:App2"
request_percentage: 80.0
70 changes: 70 additions & 0 deletions src/main/java/com/purbon/kafka/topology/model/users/Quota.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.purbon.kafka.topology.model.users;

import java.util.Optional;

public class Quota {

private String principal;
private Optional<Double> producer_byte_rate;
private Optional<Double> consumer_byte_rate;
private Optional<Double> request_percentage;

public Quota() {
this.principal = null;
producer_byte_rate = Optional.empty();
consumer_byte_rate = Optional.empty();
request_percentage = Optional.empty();
}

public Quota(
String principal,
Optional<Double> producer_byte_rate,
Optional<Double> consumer_byte_rate,
Optional<Double> request_percentage) {
this();
this.principal = principal;
this.producer_byte_rate = producer_byte_rate;
this.consumer_byte_rate = consumer_byte_rate;
this.request_percentage = request_percentage;
}

public Quota(
String principal, Optional<Double> producer_byte_rate, Optional<Double> consumer_byte_rate) {
this();
this.principal = principal;
this.producer_byte_rate = producer_byte_rate;
this.consumer_byte_rate = consumer_byte_rate;
}

public String getPrincipal() {
return principal;
}

public void setPrincipal(String principal) {
this.principal = principal;
}

public Optional<Double> getProducer_byte_rate() {
return producer_byte_rate;
}

public void setProducer_byte_rate(Optional<Double> producer_byte_rate) {
this.producer_byte_rate = producer_byte_rate;
}

public Optional<Double> getConsumer_byte_rate() {
return consumer_byte_rate;
}

public void setConsumer_byte_rate(Optional<Double> consumer_byte_rate) {
this.consumer_byte_rate = consumer_byte_rate;
}

public Optional<Double> getRequest_percentage() {
return request_percentage;
}

public void setRequest_percentage(Optional<Double> request_percentage) {
this.request_percentage = request_percentage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.purbon.kafka.topology.quotas;

import com.purbon.kafka.topology.model.users.Quota;
import java.util.*;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;

public class QuotasClientBindingsBuilder {

private static final String CONSUMER_BYTE_RATE = "consumer_byte_rate";
private static final String PRODUCER_BYTE_RATE = "producer_byte_rate";
private static final String REQUEST_RATE = "request_percentage";

Quota quota = null;

public QuotasClientBindingsBuilder(Quota quota) {
this.quota = quota;
}

private ClientQuotaEntity buildClient() {
Map<String, String> entries = new HashMap<>();
entries.put(ClientQuotaEntity.USER, null);
if (!quota.getPrincipal().isEmpty()) {
entries.put(ClientQuotaEntity.USER, quota.getPrincipal());
}
return new ClientQuotaEntity(entries);
}

private ClientQuotaAlteration.Op addConsumer() {
return new ClientQuotaAlteration.Op(
CONSUMER_BYTE_RATE, quota.getConsumer_byte_rate().orElse(null));
}

private ClientQuotaAlteration.Op addProducer() {
return new ClientQuotaAlteration.Op(
PRODUCER_BYTE_RATE, quota.getProducer_byte_rate().orElse(null));
}

private ClientQuotaAlteration.Op addRequestRate() {
return new ClientQuotaAlteration.Op(REQUEST_RATE, quota.getRequest_percentage().orElse(null));
}

public ClientQuotaAlteration build() {
ClientQuotaEntity entityQuota = buildClient();
List ops = new ArrayList();
CollectNotNull.addSafe(ops, addConsumer(), addProducer(), addRequestRate());
return new ClientQuotaAlteration(entityQuota, ops);
}

public static final class CollectNotNull {
public static <T> boolean addSafe(List<T> list, T... elements) {
if (list == null) return false;
for (T element : elements) {
if (element != null) list.add(element);
}
return true;
}
}
}
58 changes: 58 additions & 0 deletions src/main/java/com/purbon/kafka/topology/quotas/QuotasManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.purbon.kafka.topology.quotas;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.model.User;
import com.purbon.kafka.topology.model.users.Quota;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class QuotasManager {

private static final Logger LOGGER = LogManager.getLogger(QuotasManager.class);

private final AdminClient adminClient;
// private final TopologyBuilderAdminClient adminClient;
private final Configuration config;

public QuotasManager(AdminClient adminClient, Configuration config) {
this.adminClient = adminClient;
this.config = config;
}

public void assignQuotasPrincipal(Collection<Quota> quotas) {
List<ClientQuotaAlteration> lstQuotasAlteration =
quotas.stream()
.map(f -> new QuotasClientBindingsBuilder(f).build())
.collect(Collectors.toList());

this.adminClient.alterClientQuotas(lstQuotasAlteration).all();
}

public void removeQuotasPrincipal(Collection<User> users) {
List<ClientQuotaAlteration> lstQuotasRemove =
users.stream()
.map(
f ->
new QuotasClientBindingsBuilder(
new Quota(
f.getPrincipal(),
Optional.empty(),
Optional.empty(),
Optional.empty()))
.build())
.collect(Collectors.toList());
this.adminClient.alterClientQuotas(lstQuotasRemove);
}

private void describeClientQuotas() throws ExecutionException, InterruptedException {
Map<ClientQuotaEntity, Map<String, Double>> quotasresult =
this.adminClient.describeClientQuotas(ClientQuotaFilter.all()).entities().get();
}
}
Loading

0 comments on commit 236d015

Please sign in to comment.