-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HTTP lookups for c++ client lib #317
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks decent, but it's somewhat unfortunate that we'd need to implement an HTTP-subset client library for this.
what was the issue you ran into with using curl? integrating with the asio event loop?
pulsar-client-cpp/lib/ClientImpl.cc
Outdated
producerIdGenerator_(0), | ||
consumerIdGenerator_(0), | ||
requestIdGenerator_(0) { | ||
LogUtils::init(clientConfiguration.getLogConfFilePath()); | ||
if ( !strncmp(serviceUrl_.c_str(), "http", 4)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (serviceUrl.compare(0, 4, "http") == 0)
pulsar-client-cpp/lib/ClientImpl.cc
Outdated
producerIdGenerator_(0), | ||
consumerIdGenerator_(0), | ||
requestIdGenerator_(0) { | ||
LogUtils::init(clientConfiguration.getLogConfFilePath()); | ||
if ( !strncmp(serviceUrl_.c_str(), "http", 4)) { | ||
LOG_DEBUG("Using HTTP Lookup"); | ||
lookupServicePtr_ = boost::make_shared<HTTPLookupService>(boost::ref(serviceUrl_), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boost::cref
const static std::string V2_PATH = "/lookup/v2/destination/"; | ||
const static std::string PARTITION_PATH = "/admin/persistent/"; | ||
const static int MAX_HTTP_REDIRECTS = 20; | ||
const static char SEPARATOR = '/'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the use of a constant for '/' just obfuscates the code below.
pulsar-client-cpp/lib/HTTPWrapper.cc
Outdated
os << *iter << "\r\n"; | ||
iter++; | ||
} | ||
os << request.content << "\r\n"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need "\r\n" before the request body and you shouldn't add a "\r\n" after the body, so:
os << "\r\n" << request.content;
pulsar-client-cpp/lib/HTTPWrapper.cc
Outdated
: executorServiceProviderPtr_(executorServiceProviderPtr){ | ||
} | ||
|
||
std::string HTTPWrapper::getHTTPMethodName(const Request::Method& method) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it'd be best to avoid these temporaries.
const std::string& ...
switch (method) {
case Request::GET: { static const std::string s("GET"); return s; }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not making this change since if we don't copy the returned result the underlying object is deleted
#include
using namespace std;
const int& get1() {
int i = 1;
return i;
}
int main() {
int a = get1();
// int &b = get1();// compilation error since we drop the const
const int &c = get1();
cout<<a<<endl;
cout<<c<<endl;
}
./a.out
1
32767
LOG_DEBUG("HTTPLookupService::callback response = " << response); | ||
if (response.retCode != HTTPWrapper::Response::Success) { | ||
if (response.statusCode == 401) { | ||
// 401 means unauthorized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
401 means unauthenticated, 403 is unauthorized.
const std::vector<std::string>& responseHeaders = response.headers; | ||
std::vector<std::string>::const_iterator iter = responseHeaders.begin(); | ||
while (iter != responseHeaders.end()) { | ||
if (!strncmp((*iter).c_str(), "Location", 8)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another case where we should prefer std::string::compare to strncmp
pulsar-client-cpp/lib/HTTPWrapper.cc
Outdated
// eof occurs but responseStreamPtr_ not necessarily emptys | ||
LOG_DEBUG("EOF occured"); | ||
std::istream inputStream(responseStreamPtr_.get()); | ||
inputStream >> response_.content; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the intent to read all of the stream content and store into response_.content? this only reads the first whitespace delimited string. try this:
std::ostringstream os;
os << inputStream.rdbuf();
response_.content = os.str();
pulsar-client-cpp/lib/HTTPWrapper.cc
Outdated
callback_(shared_from_this()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
pulsar-client-cpp/lib/HTTPWrapper.cc
Outdated
static const HTTPWrapper::Response EMPTY_RESPONSE = HTTPWrapper::Response(); | ||
|
||
static void removeCarriage(std::string& str) { | ||
str.erase( std::remove(str.begin(), str.end(), '\r'), str.end() ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this removes all '\r' occurrences in the string, but in all of the places you use it, there's only one at the end of the line, right?
return LookupDataResultPtr(); | ||
} | ||
const std::string defaultNotFoundString = "Url Not found"; | ||
const std::string brokerUrl = root.get("brokerUrl", defaultNotFoundString).asString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used get instead of find since the json_cpp version in yahoo env doesn't support the find API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, couple of minor issues
@@ -32,6 +32,11 @@ find_library(LOG4CXX_LIBRARY_PATH log4cxx) | |||
find_library(CURL_LIBRARY_PATH curl) | |||
find_path(LOG4CXX_INCLUDE_PATH log4cxx/logger.h) | |||
find_path(GTEST_INCLUDE_PATH gtest/gtest.h) | |||
find_library(LIB_JSON jsoncpp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also update README with the dependency list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
// Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html | ||
curl_global_init(CURL_GLOBAL_ALL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I start 2 pulsar clients, there would be a problem. We need to have some static flag and mutex to ensure this gets executed once.
pulsar-client-cpp/lib/Url.cc
Outdated
return path_; | ||
} | ||
|
||
const std::string& Url::file() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is file in a URL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't that be part of the path
?
@merlimat - I have handled your CR comments - can I please get a +1 Where path is "/dir/subdir/" and file is "file.html" |
adminUrl_ = lookupUrl; | ||
} | ||
// Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html | ||
Lock lock(curlGlobalMutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this would work either. It just protects 2 threads from calling curl_global_init()
at the same time, but not at different times. Also we need to keep track of how many Pulsar instances are created, since we can only call the curl_global_cleanup()
once the last instance is destroyed.
I think the easiest option would be to encapsulate the curl_global_init / cleanup
within a c++ object managed by a static shared_ptr
or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need a shared pointer:
static void globalInit() {
struct Init {
Init() { curl_global_init(XXX); }
~Init() { curl_global_cleanup(); }
};
// required to be thread-safe by C++11, true for GCC >= 3.4.
// FIXME: someone add a #error here for old versions of MSVC
static Init init;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this will work
class HTTPLookupService : public LookupService, public boost::enable_shared_from_this<HTTPLookupService> {
class CurlInitializer {
public:
CurlInitializer() {
// Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html
curl_global_init (CURL_GLOBAL_ALL);
}
~CurlInitializer() {
curl_global_cleanup();
}
};
static CurlInitializer curlInitializer;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
ReadStreamPtr ExecutorService::createReadStream() { | ||
return boost::make_shared<boost::asio::streambuf>(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is unused, we should remove it.
* Export client metrics to Prometheus * Added missing messagesReceived.Inc()
Fixes apache#315 The original `listeners` config's semantics is wrong that it mixed the `listeners` and `advertised.listeners` semantics of Kafka. So this PR adds a `kafkaAdvertisedListeners` as the listeners exposed to client, and only use `listeners` as the bind address. To avoid conflict with other protocol handlers, mark `listeners` as deprecated and use `kafkaListeners` instead. For convenience, this PR adds an `EndPoint` class to do the listener url parse work, which can be applied to both `kafkaListeners` and `kafkaAdvertisedListeners`. It also handles `SASL_XXX` protocols which were not handled before. And the related tests are added. The existed `KafkaApisTest#testBrokerHandleTopicMetadataRequest` could verify the `kafkaAdvertisedListeners` because the tests' `kafkaAdvertisedListeners` is `127.0.0.1:<port>` while `kafkaListeners` is `localhost:<port>`.
Motivation
Want to reintroduce HTTP Lookup in order to maintain seamless production rollout
Modifications
Couldn't use URDL hence created a HTTPWrapper over asio
Result
HTTPLookup possible.
TODO - Remaining
a. Remove mentions of YCA from the code