Skip to content

Commit

Permalink
[major] configs (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
guysegal authored Apr 3, 2024
1 parent 326f38d commit 1d3d1bb
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
17 changes: 8 additions & 9 deletions src/main/java/configuration/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.StickyAssignor;
import org.apache.kafka.clients.consumer.*;

public class Config {

Expand All @@ -35,8 +32,9 @@ private enum AssignmentStrategyConfig {

//Optional
public static int KAFKA_POLL_INTERVAL_MS;
public static int MAX_POLL_RECORDS;
public static int KAFKA_FETCH_MAX_BYTES_CONFIG;
public static int KAFKA_MAX_POLL_RECORDS;
public static int KAFKA_FETCH_MAX_BYTES;
public static int KAFKA_MAX_PARTITION_FETCH_BYTES;
public static int BATCH_PARALLELISM_FACTOR;
public static int COMMIT_INTERVAL_MS;
public static String DEAD_LETTER_TOPIC;
Expand Down Expand Up @@ -103,9 +101,10 @@ public static void init() throws Exception {
RECORD_PICK_FIELD = getOptionalString(dotenv, "RECORD_PICK_FIELD", "");

KAFKA_POLL_INTERVAL_MS = getOptionalInt(dotenv, "KAFKA_POLL_INTERVAL_MS", 5 * 60 * 1000);

MAX_POLL_RECORDS = getOptionalInt(dotenv, "MAX_POLL_RECORDS", 1000);
KAFKA_FETCH_MAX_BYTES_CONFIG = getOptionalInt(dotenv, "KAFKA_FETCH_MAX_BYTES_CONFIG", 52428800);
KAFKA_MAX_POLL_RECORDS = getOptionalInt(dotenv, "KAFKA_MAX_POLL_RECORDS", 500);
KAFKA_FETCH_MAX_BYTES = getOptionalInt(dotenv, "KAFKA_FETCH_MAX_BYTES", ConsumerConfig.DEFAULT_FETCH_MAX_BYTES);
KAFKA_MAX_PARTITION_FETCH_BYTES =
getOptionalInt(dotenv, "KAFKA_MAX_PARTITION_FETCH_BYTES", ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES);

BATCH_PARALLELISM_FACTOR = getOptionalInt(dotenv, "BATCH_PARALLELISM_FACTOR", 5);

Expand Down
7 changes: 5 additions & 2 deletions src/main/java/kafka/KafkaClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,12 @@ public static ReceiverOptions<String, String> createReceiverOptions() {
String.join(",", Config.ASSIGNMENT_STRATEGY)
);
}
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Config.MAX_POLL_RECORDS);

props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Config.KAFKA_POLL_INTERVAL_MS);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Config.KAFKA_FETCH_MAX_BYTES_CONFIG);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Config.KAFKA_MAX_POLL_RECORDS);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Config.KAFKA_FETCH_MAX_BYTES);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, Config.KAFKA_MAX_PARTITION_FETCH_BYTES);

return ReceiverOptions.create(props);
}

Expand Down

0 comments on commit 1d3d1bb

Please sign in to comment.