Skip to content

Commit

Permalink
Propagate record timestamps (#21)
Browse files Browse the repository at this point in the history
Propagates kafka record timestamps for detection latency calculations
  • Loading branch information
sbaldwin-rs authored Jan 25, 2024
1 parent b0c3336 commit 406ca84
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 26 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Rockset Kafka Connect Changelog

## v2.1.0 2024-01-24
- Propagate kafka record timestamps for source detection latency

## v2.0.0 2023-10-30
- New configuration option `rockset.retry.backoff.ms`
- Removed deprecated configurations `rockset.apikey`, `rockset.collection`, and `rockset.workspace`
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>rockset</groupId>
<artifactId>kafka-connect-rockset</artifactId>
<version>2.0.0</version>
<version>2.1.0</version>
<packaging>jar</packaging>

<name>kafka-connect-rockset</name>
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/rockset/RocksetRequestWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
Expand Down Expand Up @@ -68,6 +69,15 @@ public void addDoc(
.key(key)
.offset(record.kafkaOffset())
.partition(record.kafkaPartition());

if(record.timestamp() != null){
if (record.timestampType() == TimestampType.CREATE_TIME){
message.createTime(record.timestamp());
} else if (record.timestampType() == TimestampType.LOG_APPEND_TIME){
message.logAppendTime(record.timestamp());
}
}

messages.add(message);
} catch (Exception e) {
throw new ConnectException("Invalid JSON encountered in stream ", e);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rockset/RocksetSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void put(Collection<SinkRecord> records) {
e ->
executorService.submit(
() ->
requestWrapper.addDoc(
requestWrapper.addDoc(
e.getKey().topic(),
e.getValue(),
recordParser,
Expand Down
79 changes: 55 additions & 24 deletions src/main/java/rockset/models/KafkaMessage.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package rockset.models;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModelProperty;
import java.util.Objects;

public class KafkaMessage {

Expand All @@ -23,6 +23,12 @@ public class KafkaMessage {
@SerializedName("key")
public Object key;

@SerializedName("create_time")
public Long createTime;

@SerializedName("log_append_time")
public Long logAppendTime;

/*
* Getters
*/
Expand Down Expand Up @@ -51,6 +57,19 @@ public Object getKey() {
return this.key;
}

@JsonProperty("create_time")
@ApiModelProperty(required = false, value = "Create time of the message")
public Long getCreateTime() {
return this.createTime;
}

@JsonProperty("log_append_time")
@ApiModelProperty(required = false, value = "Log append time of the message")
public Long getLogAppendTime() {
return this.logAppendTime;
}


/*
* Setters
*/
Expand All @@ -71,6 +90,16 @@ public void setKey(Object key) {
this.key = key;
}


public void setLogAppendTime(Long timestamp) {
this.logAppendTime = timestamp;
}

public void setCreateTime(Long timestamp) {
this.createTime = timestamp;
}


/*
* Builders
*/
Expand All @@ -95,6 +124,16 @@ public KafkaMessage key(Object key) {
return this;
}

public KafkaMessage logAppendTime(Long timestamp) {
this.logAppendTime = timestamp;
return this;
}

public KafkaMessage createTime(Long timestamp) {
this.createTime = timestamp;
return this;
}

/*
* Utilities
*/
Expand All @@ -107,35 +146,27 @@ private String toIndentedString(java.lang.Object o) {
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("class KafkaMessage {\n");

sb.append(" partition: ").append(this.toIndentedString(this.partition)).append("\n");
sb.append(" offset: ").append(this.toIndentedString(this.offset)).append("\n");
sb.append(" document: ").append(this.toIndentedString(this.document)).append("\n");
sb.append(" key: ").append(this.toIndentedString(this.key)).append("\n");
sb.append("}");
return sb.toString();
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
KafkaMessage that = (KafkaMessage) o;
return partition == that.partition && offset == that.offset && Objects.equal(document, that.document) && Objects.equal(key, that.key) && Objects.equal(createTime, that.createTime) && Objects.equal(logAppendTime, that.logAppendTime);
}

@Override
public int hashCode() {
return Objects.hash(this.document, this.partition, this.offset, this.key);
return Objects.hashCode(document, partition, offset, key, createTime, logAppendTime);
}

@Override
public boolean equals(java.lang.Object o) {
if (this == o) {
return true;
}
if (o == null || this.getClass() != o.getClass()) {
return false;
}
final KafkaMessage kafkaMessage = (KafkaMessage) o;
return this.getPartition() == kafkaMessage.getPartition()
&& this.getOffset() == kafkaMessage.getOffset()
&& Objects.equals(this.document, kafkaMessage.document)
&& Objects.equals(this.key, kafkaMessage.key);
public String toString() {
return "KafkaMessage{" +
"document=" + document +
", partition=" + partition +
", offset=" + offset +
", key=" + key +
", createTime=" + createTime +
", logAppendTime=" + logAppendTime +
'}';
}
}

0 comments on commit 406ca84

Please sign in to comment.