diff --git a/e2e/both.spec.js b/e2e/both.spec.js index a8289ec3..8cff3981 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -409,6 +409,27 @@ describe('Consumer/Producer', function() { ]; run_headers_test(done, headers); }); + + it('should be able to produce and consume messages with one header with NULL byte', function(done) { + var b = Buffer.alloc(3); + b.writeUInt8(1, 0); + b.writeUInt8(0, 1); + b.writeUInt8(1, 2); + var headers = [ + { key1: b }, + ]; + run_headers_test(done, headers); + }); + + it('should be able to produce and consume messages with one header with partial UTF-8 sequences byte', function(done) { + //first three bytes encode the euro sign '€' in UTF-8, the fourth byte marks the beginning of a three byte UTF-8 code, + //followed by a single byte UTF-8 code encoding the dollar sign, therefore the fourth byte is a partial UTF-8 sequence + var b = Buffer.from([0xE2, 0x82, 0xAC, 0xE2, 0x24]); + var headers = [ + { key1: b }, + ]; + run_headers_test(done, headers); + }); it('should be able to produce and consume messages with multiple headers value as string: consumeLoop', function(done) { var headers = [ @@ -621,11 +642,14 @@ describe('Consumer/Producer', function() { var messageKey = Object.keys(messageHeaders[i]); t.equal(messageKey.length, 1, 'Expected only one Header key'); t.equal(expectedKey, messageKey[0], 'Expected key does not match message key'); - var expectedValue = Buffer.isBuffer(expectedHeaders[i][expectedKey]) ? - expectedHeaders[i][expectedKey].toString() : - expectedHeaders[i][expectedKey]; - var actualValue = messageHeaders[i][expectedKey].toString(); - t.equal(expectedValue, actualValue, 'invalid message header'); + var expectedValue = expectedHeaders[i][expectedKey]; + var actualValue = messageHeaders[i][expectedKey]; + if (Buffer.isBuffer(expectedValue)) { + t.deepStrictEqual(expectedValue, actualValue, 'invalid message header'); + } else { + actualValue = actualValue.toString(); + t.equal(expectedValue, actualValue, 'invalid message header') + } } } diff --git a/src/producer.cc b/src/producer.cc index 04e75688..ef3c9f24 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -560,6 +560,9 @@ NAN_METHOD(Producer::NodeProduce) { v8::Local v8Headers = v8::Local::Cast(info[6]); if (v8Headers->Length() >= 1) { + const void* header_buffer_data; + size_t header_buffer_size; + for (unsigned int i = 0; i < v8Headers->Length(); i++) { v8::Local header = Nan::Get(v8Headers, i).ToLocalChecked() ->ToObject(Nan::GetCurrentContext()).ToLocalChecked(); @@ -571,16 +574,32 @@ NAN_METHOD(Producer::NodeProduce) { Nan::GetCurrentContext()).ToLocalChecked(); Nan::MaybeLocal v8Key = Nan::To( Nan::Get(props, 0).ToLocalChecked()); - Nan::MaybeLocal v8Value = Nan::To( - Nan::Get(header, v8Key.ToLocalChecked()).ToLocalChecked()); + + v8::Local v8Value = + Nan::Get(header, v8Key.ToLocalChecked()).ToLocalChecked(); + + if (node::Buffer::HasInstance(v8Value)) { + v8::Local key_buffer_object = + (v8Value->ToObject(Nan::GetCurrentContext())).ToLocalChecked(); + + header_buffer_data = node::Buffer::Data(v8Value); + header_buffer_size = node::Buffer::Length(v8Value); + } else { + v8::Local v8String = + Nan::To(v8Value).ToLocalChecked(); + + Nan::Utf8String uValue(v8String); + std::string value(*uValue, uValue.length()); + + header_buffer_data = value.data(); + header_buffer_size = value.length(); + } Nan::Utf8String uKey(v8Key.ToLocalChecked()); std::string key(*uKey); - Nan::Utf8String uValue(v8Value.ToLocalChecked()); - std::string value(*uValue); headers.push_back( - RdKafka::Headers::Header(key, value.c_str(), value.size())); + RdKafka::Headers::Header(key, header_buffer_data, header_buffer_size)); } } }