Skip to content

Commit

Permalink
make the datatype field as optional
Browse files Browse the repository at this point in the history
  • Loading branch information
purbon committed Dec 20, 2019
1 parent 28c3f7c commit 15d7cb9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 29 deletions.
13 changes: 6 additions & 7 deletions example/descriptor.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@

---
team: "team"
source: "source"
projects:
- name: "foo"
zookeepers: []
consumers:
- principal: "User:app0"
- principal: "User:app1"
- principal: "User:App0"
- principal: "User:App1"
producers: []
streams:
- principal: "User:App0"
topics:
read:
- "topicA"
- "topicB"
write:
- "topicC"
Expand All @@ -29,12 +29,11 @@ projects:
- "topicC"
- "topicD"
topics:
- dataType: "unknown"
name: "foo"
- name: "foo"
config:
replication.factor: "1"
num.partitions: "1"
- dataType: "unknown"
- dataType: "avro"
name: "bar"
config:
replication.factor: "1"
Expand All @@ -46,7 +45,7 @@ projects:
streams: []
connectors: []
topics:
- dataType: "unknown"
- dataType: "avro"
name: "bar"
config:
replication.factor: "1"
Expand Down
29 changes: 23 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@
</reporting>


<properties>
<jackson.version>2.10.0</jackson.version>
<kafka.version>2.3.0</kafka.version>
<log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>

<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-pmd-plugin -->
Expand All @@ -189,36 +194,48 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.10.0</version>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.12.1</version>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
<version>${log4j.version}</version>
</dependency>

<dependency>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/purbon/kafka/topology/TopologySerdes.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.purbon.kafka.topology.model.Project;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.Topology;
Expand All @@ -16,6 +17,7 @@ public class TopologySerdes {

public TopologySerdes() {
mapper = new ObjectMapper(new YAMLFactory());
mapper.registerModule(new Jdk8Module());
mapper.findAndRegisterModules();
}

Expand Down
21 changes: 12 additions & 9 deletions src/main/java/com/purbon/kafka/topology/model/Topic.java
Original file line number Diff line number Diff line change
@@ -1,35 +1,38 @@
package com.purbon.kafka.topology.model;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.purbon.kafka.topology.TopicManager;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class Topic {

public static final String UNKNOWN_DATATYPE = "unknown";
public static final String DEFAULT_TOPIC_NAME = "default";
private String dataType;
@JsonInclude(Include.NON_EMPTY)
private Optional<String> dataType;
private String name;
private HashMap<String, String> config;

private Project project;

public Topic(String name) {
this(name, UNKNOWN_DATATYPE, new HashMap<>());
this(name, Optional.empty(), new HashMap<>());
}

public Topic(String name, String dataType) {
this(name, dataType, new HashMap<>());
this(name, Optional.of(dataType), new HashMap<>());
}

public Topic(String name, String dataType, HashMap<String, String> config) {
public Topic(String name, Optional<String> dataType, HashMap<String, String> config) {
this.name = name;
this.dataType = dataType;
this.config = config;
}

public Topic() {
this(DEFAULT_TOPIC_NAME, UNKNOWN_DATATYPE, new HashMap<>());
this(DEFAULT_TOPIC_NAME, Optional.empty(), new HashMap<>());
}

public String getName() {
Expand All @@ -42,9 +45,9 @@ private String toString(Project project) {
.append(".")
.append(getName());

if (!getDataType().equals(UNKNOWN_DATATYPE)) {
if (getDataType().isPresent()) {
sb.append(".")
.append(getDataType());
.append(getDataType().get());
}

return sb.toString();
Expand Down Expand Up @@ -73,7 +76,7 @@ public Map<String, String> rawConfig() {
return getConfig();
}

public String getDataType() {
public Optional<String> getDataType() {
return dataType;
}

Expand Down
50 changes: 43 additions & 7 deletions src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,14 @@ public void testTopicConfigSerdes() throws IOException {
Topic topic = new Topic();
topic.setName("foo");
HashMap<String, String> topicConfig = new HashMap<>();
topicConfig.put("num.partitions", "3");
topicConfig.put("replication.factor", "2");
topicConfig.put("num.partitions", "1");
topicConfig.put("replication.factor", "1");
topic.setConfig(topicConfig);

Topic topicBar = new Topic();
topicBar.setName("bar");
Topic topicBar = new Topic("bar", "avro");
HashMap<String, String> topicBarConfig = new HashMap<>();
topicBarConfig.put("num.partitions", "3");
topicBarConfig.put("replication.factor", "2");
topicBarConfig.put("num.partitions", "1");
topicBarConfig.put("replication.factor", "1");
topicBar.setConfig(topicBarConfig);

Project project = new Project("foo");
Expand Down Expand Up @@ -99,6 +98,7 @@ public void testTopicConfigSerdes() throws IOException {
topology.setProjects(Arrays.asList(project, project2));

String topologyYamlString = parser.serialise(topology);
System.out.println(topologyYamlString);

Topology deserTopology = parser.deserialise(topologyYamlString);

Expand All @@ -107,9 +107,45 @@ public void testTopicConfigSerdes() throws IOException {

Assert.assertEquals(topic.getName(), serdesTopic.getName());
Assert.assertEquals(topic.getConfig().get("num.partitions"), serdesTopic.getConfig().get("num.partitions"));

}

@Test
public void testTopicWithDataType() throws IOException {

Project project = new Project("foo");

Topology topology = new Topology();
topology.setTeam("team");
topology.setSource("source");

project.setTopology(topology);
topology.addProject(project);

Topic topic = new Topic("foo", "json");
HashMap<String, String> topicConfig = new HashMap<>();
topicConfig.put("num.partitions", "3");
topicConfig.put("replication.factor", "2");
topic.setConfig(topicConfig);

project.addTopic(topic);

Topic topic2 = new Topic("topic2");
topic.setConfig(topicConfig);
project.addTopic(topic2);

String topologyYamlString = parser.serialise(topology);
Topology deserTopology = parser.deserialise(topologyYamlString);

Project serdesProject = deserTopology.getProjects().get(0);
Topic serdesTopic = serdesProject.getTopics().get(0);

Assert.assertEquals(topic.getDataType(), serdesTopic.getDataType());
Assert.assertEquals(topic.getDataType().get(), serdesTopic.getDataType().get());

Topic serdesTopic2 = serdesProject.getTopics().get(1);
Assert.assertEquals(topic2.getDataType(), serdesTopic2.getDataType());

}


private List<Project> buildProjects() {
Expand Down

0 comments on commit 15d7cb9

Please sign in to comment.