From 7b81cb3eab09fe53477371776a943693a7c96057 Mon Sep 17 00:00:00 2001 From: Sam Roberts Date: Wed, 4 Mar 2020 22:40:56 -0800 Subject: [PATCH] Support Node.js 13.x Builds and passes tests on Ubuntu with 13.x. Also fixes many deprecations. A few outstanding deprecations related to integrating with the new async context tracking API remain, so continuation-local-storage may (still) not work (have not checked). Fixes: https://github.com/Blizzard/node-rdkafka/issues/733 --- .travis.yml | 1 + src/admin.cc | 7 ++----- src/binding.cc | 16 +++++++++++----- src/callbacks.cc | 2 +- src/common.cc | 41 ++++++++++++++++++++++++++--------------- src/config.cc | 13 +++++++------ src/errors.cc | 4 ++-- src/kafka-consumer.cc | 8 +++++--- src/producer.cc | 15 ++++++++------- src/topic.cc | 4 ++-- src/workers.cc | 2 +- 11 files changed, 66 insertions(+), 47 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8f12ea2f..641e57cb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,6 +9,7 @@ node_js: - "8" - "10" - "12" + - "13" sudo: required services: docker before_install: diff --git a/src/admin.cc b/src/admin.cc index 0b84e607..6b8fa5a9 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -90,8 +90,8 @@ void AdminClient::Init(v8::Local exports) { constructor.Reset( (tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked()); - exports->Set(Nan::New("AdminClient").ToLocalChecked(), - (tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked()); + Nan::Set(exports, Nan::New("AdminClient").ToLocalChecked(), + tpl->GetFunction(Nan::GetCurrentContext()).ToLocalChecked()); } void AdminClient::New(const Nan::FunctionCallbackInfo& info) { @@ -547,9 +547,6 @@ NAN_METHOD(AdminClient::NodeCreatePartitions) { Nan::Callback *callback = new Nan::Callback(cb); AdminClient* client = ObjectWrap::Unwrap(info.This()); - // Get the timeout - int timeout = Nan::To(info[2]).FromJust(); - // Get the total number of desired partitions int partition_total_count = Nan::To(info[1]).FromJust(); diff --git a/src/binding.cc b/src/binding.cc index 5186c54a..28d6c985 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -64,24 +64,30 @@ void ConstantsInit(v8::Local exports) { NODE_DEFINE_CONSTANT(topicConstants, RdKafka::Topic::OFFSET_STORED); NODE_DEFINE_CONSTANT(topicConstants, RdKafka::Topic::OFFSET_INVALID); - exports->Set(Nan::New("topic").ToLocalChecked(), topicConstants); + Nan::Set(exports, Nan::New("topic").ToLocalChecked(), topicConstants); - exports->Set(Nan::New("err2str").ToLocalChecked(), + Nan::Set(exports, Nan::New("err2str").ToLocalChecked(), Nan::GetFunction(Nan::New(NodeRdKafkaErr2Str)).ToLocalChecked()); // NOLINT - exports->Set(Nan::New("features").ToLocalChecked(), + Nan::Set(exports, Nan::New("features").ToLocalChecked(), Nan::GetFunction(Nan::New(NodeRdKafkaBuildInFeatures)).ToLocalChecked()); // NOLINT } -void Init(v8::Local exports, v8::Local module) { +void Init(v8::Local exports, v8::Local m_, void* v_) { +#ifdef NODE_MAJOR_VERSION <= 8 AtExit(RdKafkaCleanup); +#else + v8::Local context = Nan::GetCurrentContext(); + node::Environment* env = node::GetCurrentEnvironment(context); + AtExit(env, RdKafkaCleanup, NULL); +#endif KafkaConsumer::Init(exports); Producer::Init(exports); AdminClient::Init(exports); Topic::Init(exports); ConstantsInit(exports); - exports->Set(Nan::New("librdkafkaVersion").ToLocalChecked(), + Nan::Set(exports, Nan::New("librdkafkaVersion").ToLocalChecked(), Nan::New(RdKafka::version_str().c_str()).ToLocalChecked()); } diff --git a/src/callbacks.cc b/src/callbacks.cc index dabc35bb..7b2509dc 100644 --- a/src/callbacks.cc +++ b/src/callbacks.cc @@ -42,7 +42,7 @@ v8::Local TopicPartitionListToV8Array( Nan::New(tp.offset)); } - tp_array->Set(i, tp_obj); + Nan::Set(tp_array, i, tp_obj); } return tp_array; diff --git a/src/common.cc b/src/common.cc index ea4054d2..c14b9d87 100644 --- a/src/common.cc +++ b/src/common.cc @@ -126,7 +126,11 @@ std::vector v8ArrayToStringVector(v8::Local parameter) { if (parameter->Length() >= 1) { for (unsigned int i = 0; i < parameter->Length(); i++) { - Nan::MaybeLocal p = Nan::To(parameter->Get(i)); + v8::Local v; + if (!Nan::Get(parameter, i).ToLocal(&v)) { + continue; + } + Nan::MaybeLocal p = Nan::To(v); if (p.IsEmpty()) { continue; } @@ -146,7 +150,10 @@ std::vector ToStringVector(v8::Local parameter) { if (parameter->Length() >= 1) { for (unsigned int i = 0; i < parameter->Length(); i++) { - v8::Local element = parameter->Get(i); + v8::Local element; + if (!Nan::Get(parameter, i).ToLocal(&element)) { + continue; + } if (!element->IsRegExp()) { Nan::MaybeLocal p = Nan::To(element); @@ -179,7 +186,7 @@ v8::Local ToV8Array(std::vector parameter) { for (size_t i = 0; i < parameter.size(); i++) { std::string topic = parameter[i]; - newItem->Set(i, Nan::New(topic).ToLocalChecked()); + Nan::Set(newItem, i, Nan::New(topic).ToLocalChecked()); } return newItem; @@ -203,7 +210,7 @@ v8::Local ToV8Array( topic_partition_list[topic_partition_i]; if (topic_partition->err() != RdKafka::ErrorCode::ERR_NO_ERROR) { - array->Set(topic_partition_i, + Nan::Set(array, topic_partition_i, Nan::Error(Nan::New(RdKafka::err2str(topic_partition->err())) .ToLocalChecked())); } else { @@ -220,7 +227,7 @@ v8::Local ToV8Array( Nan::New(topic_partition->topic().c_str()) .ToLocalChecked()); - array->Set(topic_partition_i, obj); + Nan::Set(array, topic_partition_i, obj); } } @@ -243,8 +250,11 @@ std::vector FromV8Array( for (size_t topic_partition_i = 0; topic_partition_i < topic_partition_list->Length(); topic_partition_i++) { - v8::Local topic_partition_value = - topic_partition_list->Get(topic_partition_i); + v8::Local topic_partition_value; + if (!Nan::Get(topic_partition_list, topic_partition_i) + .ToLocal(&topic_partition_value)) { + continue; + } if (topic_partition_value->IsObject()) { array.push_back(FromV8Object( @@ -308,7 +318,7 @@ v8::Local ToV8Object(RdKafka::Metadata* metadata) { Nan::Set(current_broker, Nan::New("port").ToLocalChecked(), Nan::New(x->port())); - broker_data->Set(broker_i, current_broker); + Nan::Set(broker_data, broker_i, current_broker); } unsigned int topic_i = 0; @@ -357,13 +367,13 @@ v8::Local ToV8Object(RdKafka::Metadata* metadata) { v8::Local current_replicas = Nan::New(); for (r_it = replicas->begin(); r_it != replicas->end(); ++r_it, r_i++) { - current_replicas->Set(r_i, Nan::New(*r_it)); + Nan::Set(current_replicas, r_i, Nan::New(*r_it)); } v8::Local current_isrs = Nan::New(); for (i_it = isrs->begin(); i_it != isrs->end(); ++i_it, i_i++) { - current_isrs->Set(i_i, Nan::New(*i_it)); + Nan::Set(current_isrs, i_i, Nan::New(*i_it)); } Nan::Set(current_partition, Nan::New("replicas").ToLocalChecked(), @@ -371,13 +381,13 @@ v8::Local ToV8Object(RdKafka::Metadata* metadata) { Nan::Set(current_partition, Nan::New("isrs").ToLocalChecked(), current_isrs); - current_topic_partitions->Set(partition_i, current_partition); + Nan::Set(current_topic_partitions, partition_i, current_partition); } // iterate over partitions Nan::Set(current_topic, Nan::New("partitions").ToLocalChecked(), current_topic_partitions); - topic_data->Set(topic_i, current_topic); + Nan::Set(topic_data, topic_i, current_topic); } // End iterating over topics Nan::Set(obj, Nan::New("orig_broker_id").ToLocalChecked(), @@ -431,7 +441,7 @@ v8::Local ToV8Object(RdKafka::Message *message, Nan::Set(v8header, Nan::New(it->key()).ToLocalChecked(), Nan::Encode(it->value_string(), it->value_size(), Nan::Encoding::BUFFER)); - v8headers->Set(index, v8header); + Nan::Set(v8headers, index, v8header); index++; } Nan::Set(pack, @@ -519,7 +529,8 @@ rd_kafka_NewTopic_t* FromV8TopicObject( if (!config_keys.IsEmpty()) { v8::Local field_array = config_keys.ToLocalChecked(); for (size_t i = 0; i < field_array->Length(); i++) { - v8::Local config_key = field_array->Get(i).As(); + v8::Local config_key = Nan::Get(field_array, i) + .ToLocalChecked().As(); v8::Local config_value = Nan::Get(config, config_key) .ToLocalChecked(); @@ -534,7 +545,7 @@ rd_kafka_NewTopic_t* FromV8TopicObject( err = rd_kafka_NewTopic_set_config( new_topic, pKeyString.c_str(), pValString.c_str()); - if (err != RdKafka::ERR_NO_ERROR) { + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { errstr = rd_kafka_err2str(err); rd_kafka_NewTopic_destroy(new_topic); return NULL; diff --git a/src/config.cc b/src/config.cc index 5db8c426..33ab9b02 100644 --- a/src/config.cc +++ b/src/config.cc @@ -35,6 +35,7 @@ void Conf::DumpConfig(std::list *dump) { } Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local object, std::string &errstr) { // NOLINT + v8::Local context = Nan::GetCurrentContext(); Conf* rdconf = static_cast(RdKafka::Conf::create(type)); v8::MaybeLocal _property_names = object->GetOwnPropertyNames( @@ -45,8 +46,8 @@ Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local object, std::string string_value; std::string string_key; - v8::Local key = property_names->Get(i); - v8::Local value = object->Get(key); + v8::Local key = Nan::Get(property_names, i).ToLocalChecked(); + v8::Local value = Nan::Get(object, key).ToLocalChecked(); if (key->IsString()) { Nan::Utf8String utf8_key(key); @@ -59,13 +60,13 @@ Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local object, #if NODE_MAJOR_VERSION > 6 if (value->IsInt32()) { string_value = std::to_string( - value->Int32Value(Nan::GetCurrentContext()).ToChecked()); + value->Int32Value(context).ToChecked()); } else if (value->IsUint32()) { string_value = std::to_string( - value->Uint32Value(Nan::GetCurrentContext()).ToChecked()); + value->Uint32Value(context).ToChecked()); } else if (value->IsBoolean()) { - string_value = value->BooleanValue( - Nan::GetCurrentContext()).ToChecked() ? "true" : "false"; + const bool v = Nan::To(value).ToChecked(); + string_value = v ? "true" : "false"; } else { Nan::Utf8String utf8_value(value.As()); string_value = std::string(*utf8_value); diff --git a/src/errors.cc b/src/errors.cc index 5b33ea2d..10ab3b19 100644 --- a/src/errors.cc +++ b/src/errors.cc @@ -19,9 +19,9 @@ v8::Local RdKafkaError(const RdKafka::ErrorCode &err, std::string er v8::Local ret = Nan::New(); - ret->Set(Nan::New("message").ToLocalChecked(), + Nan::Set(ret, Nan::New("message").ToLocalChecked(), Nan::New(errstr).ToLocalChecked()); - ret->Set(Nan::New("code").ToLocalChecked(), + Nan::Set(ret, Nan::New("code").ToLocalChecked(), Nan::New(code)); return ret; diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 31392ba3..de28c056 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -535,7 +535,7 @@ void KafkaConsumer::Init(v8::Local exports) { constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext())) .ToLocalChecked()); - exports->Set(Nan::New("KafkaConsumer").ToLocalChecked(), + Nan::Set(exports, Nan::New("KafkaConsumer").ToLocalChecked(), (tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked()); } @@ -712,8 +712,10 @@ NAN_METHOD(KafkaConsumer::NodeAssign) { std::vector topic_partitions; for (unsigned int i = 0; i < partitions->Length(); ++i) { - v8::Local partition_obj_value = partitions->Get(i); - if (!partition_obj_value->IsObject()) { + v8::Local partition_obj_value; + if (!( + Nan::Get(partitions, i).ToLocal(&partition_obj_value) && + partition_obj_value->IsObject())) { Nan::ThrowError("Must pass topic-partition objects"); } diff --git a/src/producer.cc b/src/producer.cc index 9a9b5842..385a7968 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -90,8 +90,8 @@ void Producer::Init(v8::Local exports) { constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext())) .ToLocalChecked()); - exports->Set(Nan::New("Producer").ToLocalChecked(), - (tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked()); + Nan::Set(exports, Nan::New("Producer").ToLocalChecked(), + tpl->GetFunction(Nan::GetCurrentContext()).ToLocalChecked()); } void Producer::New(const Nan::FunctionCallbackInfo& info) { @@ -450,17 +450,18 @@ NAN_METHOD(Producer::NodeProduce) { if (v8Headers->Length() >= 1) { for (unsigned int i = 0; i < v8Headers->Length(); i++) { - v8::Local header = v8Headers->Get(i)->ToObject( - Nan::GetCurrentContext()).ToLocalChecked(); + v8::Local header = Nan::Get(v8Headers, i).ToLocalChecked() + ->ToObject(Nan::GetCurrentContext()).ToLocalChecked(); if (header.IsEmpty()) { continue; } v8::Local props = header->GetOwnPropertyNames( Nan::GetCurrentContext()).ToLocalChecked(); - Nan::MaybeLocal v8Key = Nan::To(props->Get(0)); - Nan::MaybeLocal v8Value = - Nan::To(header->Get(v8Key.ToLocalChecked())); + Nan::MaybeLocal v8Key = Nan::To( + Nan::Get(props, 0).ToLocalChecked()); + Nan::MaybeLocal v8Value = Nan::To( + Nan::Get(header, v8Key.ToLocalChecked()).ToLocalChecked()); Nan::Utf8String uKey(v8Key.ToLocalChecked()); std::string key(*uKey); diff --git a/src/topic.cc b/src/topic.cc index d1ee37bb..4c9d663a 100644 --- a/src/topic.cc +++ b/src/topic.cc @@ -89,8 +89,8 @@ void Topic::Init(v8::Local exports) { constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext())) .ToLocalChecked()); - exports->Set(Nan::New("Topic").ToLocalChecked(), - (tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked()); + Nan::Set(exports, Nan::New("Topic").ToLocalChecked(), + tpl->GetFunction(Nan::GetCurrentContext()).ToLocalChecked()); } void Topic::New(const Nan::FunctionCallbackInfo& info) { diff --git a/src/workers.cc b/src/workers.cc index ec69374d..ae2cf9c1 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -569,7 +569,7 @@ void KafkaConsumerConsumeNum::HandleOKCallback() { it != m_messages.end(); ++it) { i++; RdKafka::Message* message = *it; - returnArray->Set(i, Conversion::Message::ToV8Object(message)); + Nan::Set(returnArray, i, Conversion::Message::ToV8Object(message)); delete message; }