Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

message header and message type #48

Closed
hungchinchang opened this issue Jun 18, 2020 · 14 comments
Closed

message header and message type #48

hungchinchang opened this issue Jun 18, 2020 · 14 comments
Assignees

Comments

@hungchinchang
Copy link

  1. as a producer, is it possible to add message type and message header in the message if we are publishing multiple message types in the same topic

  2. as a consumer, is it possible to include the message header and message type in the message so we can distinguish the message type/header information separately without going into the data payload. Currently, the consumer only sees certain information on the call back function.

@sshanks-kx
Copy link
Contributor

Thanks for creating an issue.
We will review possible changes & report back in due course.
Thanks

@hungchinchang
Copy link
Author

Just some background to the issue:

If we want to send different message types through the same topic, it seems the library doesn't allow us to specify the message type/header in the producer publish function.

In addition, when other processes publish message type/header, it is not shown in our consumer call back function (e.g. mtype is always empty)

I assume the way to get around this is to have the message type embedded in the data payload (JSON format) if we want to publish to the same topic - partition.

Another method I can think of is to publish/consume different message events within the same topic but on different partitions.

I am not sure how other fellow kdb+ uses this library or i am missing something. I like to know what is the best practice and whether message type and headers can be included.

Thanks,

NIon

@sshanks-kx
Copy link
Contributor

Mtype is the internal librdkafka message representation (e.g. end of partition msg) , rather than a user defined type that is sent/received (i.e. kafka can internally send message objects with the error code set which are not messages sent by a client). So you may see the same type for all client messages (normal action) and end-of-partition type when all current messages consumed.
Its actually coming from the kafka libs message object error code rather than a message type - its prob not well named - but error codes might make a user thing something bad has happened when it can be normal (e.g. end-of-partition err msg)

Kafka internal header has the key (for partitioning/etc) and general internal kafka processing (message length/etc), rather than user defined types or business applications (Ref: https://docs.confluent.io/2.0.0/clients/librdkafka/classRdKafka_1_1Message.html https://kafka.apache.org/documentation/#record ).
User content is placed in payload (e.g. a user could define their own seperated header/content within the payload section). So an architect of a given system may come up with the best solution for their use-case (topic organistion,payload style,etc) but that may be beyond the facilities of the general interface to get data to/from Kafka itself.

thanks

@hungchinchang
Copy link
Author

Thanks,

I understand what mtype is now. Our company has implemented kafka 0.11 version and it seems to have headers in the message sent.

#define RD_KAFKA_V_HEADERS(HDRS) \ _LRK_TYPECHECK(RD_KAFKA_VTYPE_HEADERS, rd_kafka_headers_t *, HDRS), \ (rd_kafka_headers_t *)HDRS
12:30
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h

line 1161 and line 1176.

They are going to put information in the message so it has header information. Unfortunately, i don't think the current kdb kafka implementation has this? Hence in the message received/published, we can't specify it?

Is this a functionality that can be added into the general interface?

Thanks,

@sshanks-kx
Copy link
Contributor

Thanks, see it all now.
Looks it requires checks for at least librdkafka >0.11.4 (luckily redhat7/etc default package is 0.11.4) and broker support (will have to add checks for that also in potential builds/calls when users might have older installs).

Further reference material:
https://github.com/edenhill/librdkafka/releases/tag/v0.11.4
https://docs.confluent.io/5.0.0/clients/librdkafka/rdkafka_8h.html

Will have to run some tests/etc & update issue when done of what might be possible, and potential changes that might have to occur.

Thanks

@sshanks-kx sshanks-kx self-assigned this Jun 22, 2020
@sshanks-kx
Copy link
Contributor

Added some basic code to publish with header names&values. With added code can see subscriber getting the names&values.
Testing further. To create appropriate function/types & version checks, before creating pull request to add code.

sshanks-kx added a commit to sshanks-kx/kafka that referenced this issue Jun 23, 2020
sshanks-kx added a commit to sshanks-kx/kafka that referenced this issue Jun 23, 2020
@hungchinchang
Copy link
Author

looks good! looking forward to use it once its ready - will you also update the code.kx.com site and give some examples either there or here so i can do some testing myself too.

@hungchinchang
Copy link
Author

Just looking at the pull requests

the consumer function accepts a headers field where key is sym and value is bytes /char arrray

the publisher function can publish header with sym!bytes (why not char array as well?)

Is the kafka c library not support types i assume?

@sshanks-kx
Copy link
Contributor

Publish should be be able to use byte array or char array for header values on publish. It'll convert either to bytes on the send
The underlying API sends/gets data as bytes.
Did you see something that showed a problem & it couldnt? Thanks

@hungchinchang
Copy link
Author

currently you have two PR, is it possible to get them into one so i can potentially make it and test it myself if the review is going to take a long time?

@sshanks-kx
Copy link
Contributor

Quick way
git clone https://github.com/KxSystems/kafka.git
cd kafka
git pull origin pull/50/head
git pull origin pull/51/head

@hungchinchang
Copy link
Author

I have tested the PR's and it seems to be working as expected.

Are there any existing libraries in kx to integrate kafka with Avro?

@sshanks-kx
Copy link
Contributor

We don't have one currently, but something we may be looking at in the future.
Other people may have done one & posted on the 'net.
Thanks

@mshimizu-kx
Copy link
Contributor

Looks like the original request was satisfied.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants