How to get consumed Kafka message topic/partition/offset? #161
Replies: 1 comment
-
There are actually two ways to access those data. Subscribing to the envelopeIn your subscriber method, you can declare the input parameter an class MySubscriber
{
public void OnMessageReceived(IInboundEnvelope<YourMessage> envelope)
{
KafkaOffset kafkaOffset = (KafkaOffset)envelope.BrokerMessageIdentifier;
string topicName = kafkaOffset.Topic;
int partition = kafkaOffset.Partition;
long offset = kafkaOffset.Offset;
YourMessage message = envelope.Message!;
// ...do the mapping or whatever you need to do with the message and the offset information
}
} Using a behavior to enrich your messageCustom behaviors can be plugged into the consumer/producer pipelines, see https://silverback-messaging.net/concepts/broker/behaviors.html#custom-behaviors. public class TopicOffsetEnrichmentBehavior : IConsumerBehavior
{
public Task HandleAsync(ConsumerPipelineContext context, ConsumerBehaviorHandler next)
{
if (context.Envelope is IInboundEnvelope envelope && envelope.Message is YourMessage message &&
envelope.BrokerMessageIdentifier is KafkaOffset kafkaOffset)
{
message.TopicName = kafkaOffset.Topic;
message.Offset = kafkaOffset.Offset;
}
return next(context);
}
} You could maybe use an interface or something similar to make it more generic but this is the basic idea. I slightly prefer the behavior approach, keeping the subscriber nice and clean, but that's up to you. EDIT: The envelope contains of course the headers collection as well, see IInboundEnvelope<TMessage>. |
Beta Was this translation helpful? Give feedback.
-
Hello, I have started developing a project using Silverback to replace old .NET Framework applications for consuming and producing messages with Kafka.
The class I'm serializing / deserializing in my application needs to be updated with the key, offset and topic the consumer received it from. This is fairly easy using a regular Consumer, however in Silverback there is no actual way to get these. I have added a JsonIgnored property in my class with the header x-kafka-message-key for the key part of the equation, but as far as I can see there is no way to get the partition and the topic of the consumed message.
Beta Was this translation helpful? Give feedback.
All reactions