Skip to content

Commit

Permalink
[#10704] Modify the logic so that the long type sortkey is properly a…
Browse files Browse the repository at this point in the history
…ssigned a partition.
  • Loading branch information
minwoo-jung committed Apr 12, 2024
1 parent 6fbea5b commit f05f733
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 15 deletions.
6 changes: 6 additions & 0 deletions inspector-module/inspector-collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,11 @@
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-segment-spi</artifactId>
<version>${pinot.client.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public KafkaTemplate<String, AgentStat> kafkaAgentStatTemplate(@Qualifier("kafka
}

@Bean
public KafkaTemplate<Long, AgentStatV2> kafkaAgentStatV2Template(@Qualifier("kafkaProducerLongKeyFactory") ProducerFactory producerFactory) {
public KafkaTemplate<byte[], AgentStatV2> kafkaAgentStatV2Template(@Qualifier("kafkaProducerByteArrayKeyFactory") ProducerFactory producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public class DefaultAgentStatDao <T extends AgentStatDataPoint> implements Agent
private final BiFunction<List<T>, String, List<AgentStat>> convertToKafkaAgentStatModelFunction;
private final Function<List<AgentStat>, List<ApplicationStat>> convertToKafkaApplicationStatModelFunction;
private final KafkaTemplate<String, AgentStat> kafkaAgentStatTemplate;
private final KafkaTemplate<Long, AgentStatV2> kafkaAgentStatV2Template;
private final KafkaTemplate<byte[], AgentStatV2> kafkaAgentStatV2Template;
private final KafkaTemplate<String, ApplicationStat> kafkaApplicationStatTemplate;
private final String agentStatTopic;
private final String agentStatTopicV2;
private final String applicationStatTopic;
private final TenantProvider tenantProvider;
private final HashFunction hashFunction = Hashing.murmur3_128();

public DefaultAgentStatDao(Function<AgentStatBo, List<T>> dataPointFunction, KafkaTemplate<String, AgentStat> kafkaAgentStatTemplate, KafkaTemplate<Long, AgentStatV2> kafkaAgentStatV2Template, KafkaTemplate<String, ApplicationStat> kafkaApplicationStatTemplate, BiFunction<List<T>, String, List<AgentStat>> convertToKafkaAgentStatModelFunction, Function<List<AgentStat>, List<ApplicationStat>> convertToKafkaApplicationStatModelFunction, String agentStatTopic, String applicationStatTopic, TenantProvider tenantProvider) {
public DefaultAgentStatDao(Function<AgentStatBo, List<T>> dataPointFunction, KafkaTemplate<String, AgentStat> kafkaAgentStatTemplate, KafkaTemplate<byte[], AgentStatV2> kafkaAgentStatV2Template, KafkaTemplate<String, ApplicationStat> kafkaApplicationStatTemplate, BiFunction<List<T>, String, List<AgentStat>> convertToKafkaAgentStatModelFunction, Function<List<AgentStat>, List<ApplicationStat>> convertToKafkaApplicationStatModelFunction, String agentStatTopic, String applicationStatTopic, TenantProvider tenantProvider) {
this.dataPointFunction = Objects.requireNonNull(dataPointFunction, "dataPointFunction");
this.kafkaAgentStatTemplate = Objects.requireNonNull(kafkaAgentStatTemplate, "kafkaAgentStatTemplate");
this.kafkaAgentStatV2Template = Objects.requireNonNull(kafkaAgentStatV2Template, "kafkaAgentStatTemplate");
Expand All @@ -77,8 +77,9 @@ public void insert(String agentId, List<T> agentStatData) {
}

List<AgentStatV2> agentStatV2List = convertToKafkaAgentStatV2Model(agentStatList);
byte[] kafkaKey = generateKafkaKey(agentStatV2List);
for (AgentStatV2 agentStatV2 : agentStatV2List) {
kafkaAgentStatV2Template.send(agentStatTopicV2, agentStatV2.getSortKey(), agentStatV2);
kafkaAgentStatV2Template.send(agentStatTopicV2, kafkaKey, agentStatV2);
}

List<ApplicationStat> applicationStatList = convertToKafkaApplicationStatModel(agentStatList);
Expand All @@ -88,14 +89,21 @@ public void insert(String agentId, List<T> agentStatData) {

}

private byte[] generateKafkaKey(List<AgentStatV2> agentStatV2List) {
if (agentStatV2List.isEmpty()) {
return new byte[0];
}

return agentStatV2List.get(0).getSortKey().toString().getBytes(StandardCharsets.UTF_8);
}

private List<AgentStatV2> convertToKafkaAgentStatV2Model(List<AgentStat> agentStatList) {
if (agentStatList.isEmpty()) {
return Collections.emptyList();
}

List<AgentStatV2> agentStatV2List = new ArrayList<>(agentStatList.size());
Long sortKey = hashFunction.hashString(agentStatList.get(0).getSortKey(), StandardCharsets.UTF_8).asLong();

for (AgentStat agentStat : agentStatList) {
agentStatV2List.add(new AgentStatV2(agentStat, sortKey));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@
public class PinotDaoConfiguration {

private final KafkaTemplate<String, AgentStat> kafkaAgentStatTemplate;
private final KafkaTemplate<Long, AgentStatV2> kafkaAgentStatV2Template;
private final KafkaTemplate<byte[], AgentStatV2> kafkaAgentStatV2Template;
private final KafkaTemplate<String, ApplicationStat> kafkaApplicationStatTemplate;
private final String agentStatTopic;
private final String applicationStatTopic;
private final TenantProvider tenantProvider;

public PinotDaoConfiguration(KafkaTemplate<String, AgentStat> kafkaAgentStatTemplate, KafkaTemplate<Long, AgentStatV2> kafkaAgentStatV2Template, KafkaTemplate<String, ApplicationStat> kafkaApplicationStatTemplate, @Value("${kafka.inspector.topic.agent}") String agentStatTopic, @Value("${kafka.inspector.topic.application}") String applicationStatTopic, TenantProvider tenantProvider) {
public PinotDaoConfiguration(KafkaTemplate<String, AgentStat> kafkaAgentStatTemplate, KafkaTemplate<byte[], AgentStatV2> kafkaAgentStatV2Template, KafkaTemplate<String, ApplicationStat> kafkaApplicationStatTemplate, @Value("${kafka.inspector.topic.agent}") String agentStatTopic, @Value("${kafka.inspector.topic.application}") String applicationStatTopic, TenantProvider tenantProvider) {
this.kafkaAgentStatTemplate = Objects.requireNonNull(kafkaAgentStatTemplate, "kafkaAgentStatTemplate");
this.kafkaAgentStatV2Template = Objects.requireNonNull(kafkaAgentStatV2Template, "kafkaAgentStatV2Template");
this.kafkaApplicationStatTemplate = Objects.requireNonNull(kafkaApplicationStatTemplate, "kafkaApplicationStatTemplate");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,139 @@

package com.navercorp.pinpoint.inspector.collector.dao.pinot;

import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pinot.segment.spi.partition.MurmurPartitionFunction;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

/**
* @author minwoo-jung
*/
class DefaultAgentStatDaoTest {

private final Logger logger = LogManager.getLogger(DefaultAgentStatDaoTest.class.getName());
@Test
public void patitionTest() {
public void kafkaPatitionTest() {
int numPartitions = 64;
String key = "minwoo_local_app#jvmGc";
int partition = Utils.toPositive(Utils.murmur2(key.getBytes())) % numPartitions;
assertEquals(18, partition);
}

@Test
public void kafkaPatitionTest2() {
int numPartitions = 128;
String key = "minwoo_local_app2#minwoo_local_agent2#dataSource";
int partition = Utils.toPositive(Utils.murmur2(key.getBytes())) % numPartitions;
assertEquals(118, partition);
}

// @Test
public void googleHashFunctionTest() {
HashFunction hashFunction = Hashing.murmur3_128();
String sortKey = "applicationName#agentId#metricName";
byte[] bytes = hashFunction.hashString(sortKey, StandardCharsets.UTF_8).asBytes();
logger.info(Arrays.toString(bytes));
}


@Test
public void googleHashFunctionTest2() {
HashFunction hashFunction = Hashing.murmur3_128();
String sortKey = "applicationName#agentId#metricName";
Long longValue = hashFunction.hashString(sortKey, StandardCharsets.UTF_8).asLong();
logger.info(longValue);

String sortKey2 = "applicationName#agentId#metricName";
Long longValue2 = hashFunction.hashString(sortKey2, StandardCharsets.UTF_8).asLong();
logger.info(longValue2);

assertEquals(longValue, longValue2);


String sortKey3 = "applicationName2#agentId2#metricName2";
Long longValue3 = hashFunction.hashString(sortKey3, StandardCharsets.UTF_8).asLong();
logger.info(longValue3);

String sortKey4 = "applicationName3#agentId3#metricName3";
Long longValue4 = hashFunction.hashString(sortKey4, StandardCharsets.UTF_8).asLong();
logger.info(longValue4);

assertNotEquals(longValue3, longValue4);

}

@Test
public void kafkaPartitionForStringSortKeyTest() {
int numPartitions = 64;
StringSerializer keySerializer = new StringSerializer();
byte[] keyBytes = keySerializer.serialize("inspector-stat", "minwoo_local_app2#minwoo_local_agent2#dataSource");
int partition = BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
assertEquals(54, partition);
}


//kafka partition 결과 int sortKey
@Test
public void kafkaPartitionForLongSortKeyTest() {
int numPartitions = 128;
Long longValue = -4545381519295174261L;

LongSerializer keySerializer = new LongSerializer();
byte[] keyBytes = keySerializer.serialize("inspector-stat", longValue);
int partition = BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
assertEquals(45, partition);
}

@Test
public void pinotPartitionForLongSortKeyTest() {
int numPartitions = 128;
Long longValue = -4545381519295174261L;
MurmurPartitionFunction murmurPartitionFunction = new MurmurPartitionFunction(numPartitions);
int partition = murmurPartitionFunction.getPartition(longValue);
assertEquals(105, partition);
}

@Test
public void comparePartitionOperationInKafkaAndPinotForLongSortKeyTest() {
int numPartitions = 128;
Long longValue = 5522573437844253163L;
MurmurPartitionFunction murmurPartitionFunction = new MurmurPartitionFunction(numPartitions);
int pinotPartition = murmurPartitionFunction.getPartition(longValue);
assertEquals(1, pinotPartition);

LongSerializer keySerializer = new LongSerializer();
byte[] keyBytes = longValue.toString().getBytes(StandardCharsets.UTF_8);
int kafkaPartition = BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
assertEquals(1, kafkaPartition);
}

@Test
public void compareConvetedByteArray() {
Long longValue = -4545381519295174261L;

LongSerializer keySerializer = new LongSerializer();
byte[] keyBytesByKafka = keySerializer.serialize("inspector-stat", longValue);

byte[] keyBytesByPinot = longValue.toString().getBytes(StandardCharsets.UTF_8);

logger.info(longValue.toString());
logger.info(Arrays.toString(keyBytesByKafka));
logger.info(Arrays.toString(keyBytesByPinot));
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public ProducerFactory kafkaProducerFactory(KafkaProperties properties) {
}

@Bean
public ProducerFactory kafkaProducerLongKeyFactory(KafkaProperties properties) {
public ProducerFactory kafkaProducerByteArrayKeyFactory(KafkaProperties properties) {
logger.info("kafka {}:{}", ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
logger.debug("kafka config:{}", properties);

Map<String, Object> config = toConfig(properties);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getLongKeySerializer());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getByteArrayKeySerializer());
return new DefaultKafkaProducerFactory<>(config);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.navercorp.pinpoint.pinot.kafka;

import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

public class KafkaProperties {
private String bootstrapServers;
private String keySerializer = StringSerializer.class.getName();
private String longKeySerializer = LongSerializer.class.getName();
private String byteArrayKeySerializer = ByteArraySerializer.class.getName();
private String valueSerializer = JsonSerializer.class.getName();
private String partitionerClass = DefaultPartitioner.class.getName();
private String acks = "1";
Expand All @@ -31,8 +31,8 @@ public void setKeySerializer(String keySerializer) {
this.keySerializer = keySerializer;
}

public String getLongKeySerializer() {
return longKeySerializer;
public String getByteArrayKeySerializer() {
return byteArrayKeySerializer;
}

public String getValueSerializer() {
Expand Down

0 comments on commit f05f733

Please sign in to comment.