Skip to content

Commit

Permalink
Group metadata manager supports storing offsets (apache#27)
Browse files Browse the repository at this point in the history
Master Issue: #4 

*Motivation*

Support storing and committing offsets in group metadata manager.
  • Loading branch information
sijie authored Aug 5, 2019
1 parent 4dab945 commit 0e094bd
Show file tree
Hide file tree
Showing 6 changed files with 2,678 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import com.google.common.collect.Lists;
import io.streamnative.kop.coordinator.group.GroupMetadataManager.BaseKey;
import io.streamnative.kop.coordinator.group.GroupMetadataManager.GroupMetadataKey;
import io.streamnative.kop.coordinator.group.GroupMetadataManager.GroupTopicPartition;
import io.streamnative.kop.coordinator.group.GroupMetadataManager.OffsetKey;
import io.streamnative.kop.offset.OffsetAndMetadata;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.Field;
Expand Down Expand Up @@ -199,6 +203,30 @@ private static <K, V> Map<K, V> asMap(KeyValue<K, V> ...kvs) {
));
}

/**
* Generates the key for offset commit message for given (group, topic, partition).
*
* @return key for offset commit message
*/
static byte[] offsetCommitKey(String group,
TopicPartition topicPartition) {
return offsetCommitKey(group, topicPartition, (short) 0);
}

static byte[] offsetCommitKey(String group,
TopicPartition topicPartition,
short versionId) {
Struct key = new Struct(CURRENT_OFFSET_KEY_SCHEMA);
key.set(OFFSET_KEY_GROUP_FIELD, group);
key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic());
key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition());

ByteBuffer byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf());
byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION);
key.writeTo(byteBuffer);
return byteBuffer.array();
}

/**
* Generates the key for group metadata message for given group.
*
Expand All @@ -213,6 +241,30 @@ static byte[] groupMetadataKey(String group) {
return byteBuffer.array();
}

/**
* Generates the payload for offset commit message from given offset and metadata.
*
* @param offsetAndMetadata consumer's current offset and metadata
* @return payload for offset commit message
*/
static byte[] offsetCommitValue(OffsetAndMetadata offsetAndMetadata) {
Struct value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA);
value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset());
value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata());
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp());
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp());
ByteBuffer byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf());
byteBuffer.putShort(CURRENT_GROUP_VALUE_SCHEMA_VERSION);
value.writeTo(byteBuffer);
return byteBuffer.array();
}


static byte[] groupMetadataValue(GroupMetadata groupMetadata,
Map<String, byte[]> assignment) {
return groupMetadataValue(groupMetadata, assignment, (short) 0);
}

/**
* Generates the payload for group metadata message from given offset and metadata
* assuming the generation id, selected protocol, leader and member assignment are all available.
Expand Down Expand Up @@ -285,7 +337,17 @@ static BaseKey readMessageKey(ByteBuffer buffer) {

if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
// version 0 and 1 refer to offset
throw new UnsupportedOperationException();
String group = key.getString(OFFSET_KEY_GROUP_FIELD);
String topic = key.getString(OFFSET_KEY_TOPIC_FIELD);
int partition = key.getInt(OFFSET_KEY_PARTITION_FIELD);
return new OffsetKey(
version,
new GroupTopicPartition(
group,
topic,
partition
)
);
} else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
// version 2 refers to group
String group = key.getString(GROUP_KEY_GROUP_FIELD);
Expand All @@ -296,6 +358,33 @@ static BaseKey readMessageKey(ByteBuffer buffer) {
}
}

static OffsetAndMetadata readOffsetMessageValue(ByteBuffer buffer) {
if (null == buffer) {
return null;
}

short version = buffer.getShort();
Schema valueSchema = schemaForOffset(version);
Struct value = valueSchema.read(buffer);

if (version == 0) {
long offset = value.getLong(OFFSET_VALUE_OFFSET_FIELD_V0);
String metadata = value.getString(OFFSET_VALUE_METADATA_FIELD_V0);
long timestamp = value.getLong(OFFSET_VALUE_TIMESTAMP_FIELD_V0);

return OffsetAndMetadata.apply(offset, metadata, timestamp);
} else if (version == 1){
long offset = value.getLong(OFFSET_VALUE_OFFSET_FIELD_V1);
String metadata = value.getString(OFFSET_VALUE_METADATA_FIELD_V1);
long commitTimestamp = value.getLong(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1);
long expireTimestamp = value.getLong(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1);

return OffsetAndMetadata.apply(offset, metadata, commitTimestamp, expireTimestamp);
} else {
throw new IllegalStateException("Unknown offset message version " + version);
}
}

static GroupMetadata readGroupMessageValue(String groupId,
ByteBuffer buffer) {
if (null == buffer) { // tombstone
Expand Down
Loading

0 comments on commit 0e094bd

Please sign in to comment.