diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt index 26d88c41e1a95..7907acbaaf9bd 100644 --- a/pulsar-client-cpp/CMakeLists.txt +++ b/pulsar-client-cpp/CMakeLists.txt @@ -32,6 +32,11 @@ find_library(LOG4CXX_LIBRARY_PATH log4cxx) find_library(CURL_LIBRARY_PATH curl) find_path(LOG4CXX_INCLUDE_PATH log4cxx/logger.h) find_path(GTEST_INCLUDE_PATH gtest/gtest.h) +find_library(LIB_JSON jsoncpp) + +if (NOT LIB_JSON) + find_library(LIB_JSON json_cpp) +endif (NOT LIB_JSON) set(ADDITIONAL_LIBRARIES $ENV{PULSAR_ADDITIONAL_LIBRARIES}) link_directories( $ENV{PULSAR_ADDITIONAL_LIBRARY_PATH} ) @@ -56,6 +61,7 @@ set(COMMON_LIBS ${LOG4CXX_LIBRARY_PATH} ${CURL_LIBRARY_PATH} ${ADDITIONAL_LIBRARIES} + ${LIB_JSON} ) link_directories(${CMAKE_BINARY_DIR}/lib) diff --git a/pulsar-client-cpp/README.md b/pulsar-client-cpp/README.md index 8ae73b8d10cac..3ddb301354990 100644 --- a/pulsar-client-cpp/README.md +++ b/pulsar-client-cpp/README.md @@ -12,7 +12,7 @@ https://github.com/yahoo/pulsar/tree/master/pulsar-client-cpp/examples * [Log4CXX](https://logging.apache.org/log4cxx) * LibCurl * [GTest](https://github.com/google/googletest) - + * JsonCpp ## Platforms Pulsar C++ Client Library has been tested on: diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc index e29882ff1aa4f..2434dd95d12b3 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc @@ -16,7 +16,6 @@ #include "DestinationName.h" #include "BinaryProtoLookupService.h" -#include "LogUtils.h" #include "SharedBuffer.h" #include diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h b/pulsar-client-cpp/lib/BinaryProtoLookupService.h index ad448ec3502d2..f81684de06bed 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h @@ -18,22 +18,16 @@ #define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_ #include - -#include #include -#include #include "ConnectionPool.h" -#include "DestinationName.h" -#include "Future.h" -#include "LookupDataResult.h" #include "Backoff.h" - +#include #pragma GCC visibility push(default) namespace pulsar { class LookupDataResult; -class BinaryProtoLookupService { +class BinaryProtoLookupService : public LookupService { public: /* * constructor @@ -70,7 +64,7 @@ class BinaryProtoLookupService { uint64_t newRequestId(); }; - +typedef boost::shared_ptr BinaryProtoLookupServicePtr; } #pragma GCC visibility pop diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index ebd7cba2312fc..85086509c54d9 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -27,6 +27,7 @@ #include #include #include "boost/date_time/posix_time/posix_time.hpp" +#include DECLARE_LOG_OBJECT() @@ -63,11 +64,20 @@ namespace pulsar { listenerExecutorProvider_(boost::make_shared(clientConfiguration.getMessageListenerThreads())), partitionListenerExecutorProvider_(boost::make_shared(clientConfiguration.getMessageListenerThreads())), pool_(clientConfiguration, ioExecutorProvider_, clientConfiguration.getAuthenticationPtr(), poolConnections), - lookup_(pool_, serviceUrl), producerIdGenerator_(0), consumerIdGenerator_(0), requestIdGenerator_(0) { LogUtils::init(clientConfiguration.getLogConfFilePath()); + if (serviceUrl_.compare(0, 4, "http") == 0) { + LOG_DEBUG("Using HTTP Lookup"); + lookupServicePtr_ = boost::make_shared(boost::cref(serviceUrl_), + boost::cref(clientConfiguration_), + boost::cref( + clientConfiguration.getAuthenticationPtr())); + } else { + LOG_DEBUG("Using Binary Lookup"); + lookupServicePtr_ = boost::make_shared(boost::ref(pool_), boost::ref(serviceUrl)); + } } ClientImpl::~ClientImpl() { @@ -105,7 +115,7 @@ namespace pulsar { return; } } - lookup_.getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleCreateProducer, + lookupServicePtr_->getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleCreateProducer, shared_from_this(), _1, _2, dn, conf, callback)); } @@ -157,7 +167,7 @@ namespace pulsar { } } - lookup_.getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleSubscribe, + lookupServicePtr_->getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleSubscribe, shared_from_this(), _1, _2, dn, consumerName, conf, callback)); } @@ -211,7 +221,7 @@ namespace pulsar { Future ClientImpl::getConnection(const std::string& topic) { Promise promise; - lookup_.lookupAsync(topic).addListener(boost::bind(&ClientImpl::handleLookup, this, _1, _2, promise)); + lookupServicePtr_->lookupAsync(topic).addListener(boost::bind(&ClientImpl::handleLookup, this, _1, _2, promise)); return promise.getFuture(); } diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h index 1eb5ffdd19393..1fa03ce647483 100644 --- a/pulsar-client-cpp/lib/ClientImpl.h +++ b/pulsar-client-cpp/lib/ClientImpl.h @@ -108,7 +108,7 @@ class ClientImpl : public boost::enable_shared_from_this { ExecutorServiceProviderPtr listenerExecutorProvider_; ExecutorServiceProviderPtr partitionListenerExecutorProvider_; - BinaryProtoLookupService lookup_; + LookupServicePtr lookupServicePtr_; ConnectionPool pool_; uint64_t producerIdGenerator_; diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc new file mode 100644 index 0000000000000..cc74f4246e34e --- /dev/null +++ b/pulsar-client-cpp/lib/HTTPLookupService.cc @@ -0,0 +1,200 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +DECLARE_LOG_OBJECT() + +namespace pulsar { + const static std::string V2_PATH = "/lookup/v2/destination/"; + const static std::string PARTITION_PATH = "/admin/persistent/"; + const static int MAX_HTTP_REDIRECTS = 20; + const static std::string PARTITION_METHOD_NAME = "partitions"; + const static int NUMBER_OF_LOOKUP_THREADS = 1; + + HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer; + + HTTPLookupService::HTTPLookupService(const std::string &lookupUrl, + const ClientConfiguration &clientConfiguration, + const AuthenticationPtr &authData) + : executorProvider_(boost::make_shared(NUMBER_OF_LOOKUP_THREADS)), + authenticationPtr_(authData), + lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()) { + if (lookupUrl[lookupUrl.length() - 1] == '/') { + // Remove trailing '/' + adminUrl_ = lookupUrl.substr(0, lookupUrl.length() - 1); + } else { + adminUrl_ = lookupUrl; + } + } + + Future HTTPLookupService::lookupAsync(const std::string &destinationName) { + LookupPromise promise; + boost::shared_ptr dn = DestinationName::get(destinationName); + if (!dn) { + LOG_ERROR("Unable to parse destination - " << destinationName); + promise.setFailed(ResultInvalidTopicName); + return promise.getFuture(); + } + + std::stringstream completeUrlStream; + completeUrlStream << adminUrl_ << V2_PATH << "persistent/" << dn->getProperty() << '/' << dn->getCluster() + << '/' << dn->getNamespacePortion() << '/' << dn->getEncodedLocalName(); + executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), Lookup)); + return promise.getFuture(); + } + + Future HTTPLookupService::getPartitionMetadataAsync(const DestinationNamePtr &dn) { + LookupPromise promise; + std::stringstream completeUrlStream; + completeUrlStream << adminUrl_ << PARTITION_PATH << dn->getProperty() << '/' << dn->getCluster() + << '/' << dn->getNamespacePortion() << '/' << dn->getEncodedLocalName() << '/' + << PARTITION_METHOD_NAME; + executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), PartitionMetaData)); + return promise.getFuture(); + } + + static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void *responseDataPtr) { + ((std::string*)responseDataPtr)->append((char*)contents, size * nmemb); + return size * nmemb; + } + + void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string completeUrl, + RequestType requestType) { + CURL *handle; + CURLcode res; + std::string responseData; + + handle = curl_easy_init(); + + if(!handle) { + LOG_ERROR("Unable to curl_easy_init for url " << completeUrl); + promise.setFailed(ResultLookupError); + // No curl_easy_cleanup required since handle not initialized + return; + } + // set URL + curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str()); + + // Write callback + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback); + curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData); + + // New connection is made for each call + curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L); + curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L); + + // Skipping signal handling - results in timeouts not honored during the DNS lookup + curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L); + + // Timer + curl_easy_setopt(handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_); + + // Redirects + curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(handle, CURLOPT_MAXREDIRS, MAX_HTTP_REDIRECTS); + + // Fail if HTTP return code >=400 + curl_easy_setopt(handle, CURLOPT_FAILONERROR, 1L); + + // Authorization data + AuthenticationDataPtr authDataContent; + Result authResult = authenticationPtr_->getAuthData(authDataContent); + if (authResult != ResultOk) { + LOG_ERROR("All Authentication methods should have AuthenticationData and return true on getAuthData for url " << completeUrl); + promise.setFailed(authResult); + curl_easy_cleanup(handle); + return; + } + struct curl_slist *list = NULL; + if (authDataContent->hasDataForHttp()) { + list = curl_slist_append(list, authDataContent->getHttpHeaders().c_str()); + } + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list); + + // Make get call to server + res = curl_easy_perform(handle); + + // Free header list + curl_slist_free_all(list); + + switch(res) { + case CURLE_OK: + LOG_DEBUG("Response received successfully for url " << completeUrl); + promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) : parseLookupData(responseData)); + break; + case CURLE_COULDNT_CONNECT: + case CURLE_COULDNT_RESOLVE_PROXY: + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_HTTP_RETURNED_ERROR: + LOG_ERROR("Response failed for url "<(); + lookupDataResultPtr->setBrokerUrl(brokerUrl); + lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl); + return lookupDataResultPtr; + } +} diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h b/pulsar-client-cpp/lib/HTTPLookupService.h new file mode 100644 index 0000000000000..1474908f27dd0 --- /dev/null +++ b/pulsar-client-cpp/lib/HTTPLookupService.h @@ -0,0 +1,61 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PULSAR_CPP_HTTPLOOKUPSERVICE_H +#define PULSAR_CPP_HTTPLOOKUPSERVICE_H + +#include +#include +#include +#include +#include +#include +#include + +namespace pulsar { + class HTTPLookupService : public LookupService, public boost::enable_shared_from_this { + class CurlInitializer { + public: + CurlInitializer() { + // Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html + curl_global_init (CURL_GLOBAL_ALL); + } + ~CurlInitializer() { + curl_global_cleanup(); + } + }; + static CurlInitializer curlInitializer; + enum RequestType {Lookup, PartitionMetaData}; + typedef Promise LookupPromise; + ExecutorServiceProviderPtr executorProvider_; + std::string adminUrl_; + AuthenticationPtr authenticationPtr_; + int lookupTimeoutInSeconds_; + + static LookupDataResultPtr parsePartitionData(const std::string&); + static LookupDataResultPtr parseLookupData(const std::string&); + void sendHTTPRequest(LookupPromise, const std::string, RequestType); + public: + HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&); + + Future lookupAsync(const std::string&); + + Future getPartitionMetadataAsync(const DestinationNamePtr&); + }; + +} + +#endif //PULSAR_CPP_HTTPLOOKUPSERVICE_H diff --git a/pulsar-client-cpp/lib/LookupDataResult.h b/pulsar-client-cpp/lib/LookupDataResult.h index 855fa153e29a9..63dda6f163578 100644 --- a/pulsar-client-cpp/lib/LookupDataResult.h +++ b/pulsar-client-cpp/lib/LookupDataResult.h @@ -17,6 +17,8 @@ #ifndef _PULSAR_LOOKUP_DATA_RESULT_HEADER_ #define _PULSAR_LOOKUP_DATA_RESULT_HEADER_ #include +#include +#include namespace pulsar { class LookupDataResult; diff --git a/pulsar-client-cpp/lib/LookupService.h b/pulsar-client-cpp/lib/LookupService.h new file mode 100644 index 0000000000000..4c5a395af98ca --- /dev/null +++ b/pulsar-client-cpp/lib/LookupService.h @@ -0,0 +1,45 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PULSAR_CPP_LOOKUPSERVICE_H +#define PULSAR_CPP_LOOKUPSERVICE_H + +#include +#include +#include +#include +#include + +namespace pulsar { +class LookupService { +public: + /* + * @param destinationName - topic name + * + * Looks up the owner broker for the given destination name + */ + virtual Future lookupAsync(const std::string& destinationName) = 0; + + /* + * @param dn - pointer to destination (topic) name + * + * Gets Partition metadata + */ + virtual Future getPartitionMetadataAsync(const DestinationNamePtr& dn) = 0; +}; +typedef boost::shared_ptr LookupServicePtr; +} +#endif //PULSAR_CPP_LOOKUPSERVICE_H diff --git a/pulsar-client-cpp/lib/Url.cc b/pulsar-client-cpp/lib/Url.cc index 09803be6512d0..f28dce09df55b 100644 --- a/pulsar-client-cpp/lib/Url.cc +++ b/pulsar-client-cpp/lib/Url.cc @@ -17,8 +17,7 @@ #include "Url.h" #include -#include -#include +#include namespace pulsar { @@ -53,6 +52,10 @@ bool Url::parse(const std::string& urlStr, Url& url) { url.protocol_ = std::string(groups[1].first, groups[1].second); url.host_ = std::string(groups[2].first, groups[2].second); std::string portStr(groups[3].first, groups[3].second); + url.pathWithoutFile_ = std::string(groups[4].first, groups[4].second); + url.file_ = std::string(groups[5].first, groups[5].second); + url.parameter_ = std::string(groups[6].first, groups[6].second); + url.path_ = url.pathWithoutFile_ + url.file_; if (!portStr.empty()) { url.port_ = atoi(groups[3].first); @@ -81,4 +84,26 @@ const int Url::port() const { return port_; } +const std::string& Url::path() const { + return path_; +} + +const std::string& Url::pathWithoutFile() const { + return pathWithoutFile_; +} + +const std::string& Url::file() const { + return file_; +} + +const std::string& Url::parameter() const { + return parameter_; +} + +std::ostream & operator<<(std::ostream &os, const Url& obj) { + os << "Url [Host = " << obj.host() << ", Protocol = " << obj.protocol() + << ", Port = " << obj.port() << "]"; + return os; +} + } // pulsar diff --git a/pulsar-client-cpp/lib/Url.h b/pulsar-client-cpp/lib/Url.h index e922a9b887738..f695d4c38a231 100644 --- a/pulsar-client-cpp/lib/Url.h +++ b/pulsar-client-cpp/lib/Url.h @@ -33,11 +33,19 @@ class Url { const std::string& protocol() const; const std::string& host() const; const int port() const; - + const std::string& path() const; + const std::string& pathWithoutFile() const; + const std::string& file() const; + const std::string& parameter() const; + friend std::ostream& operator<<(std::ostream &os, const Url& obj); private: std::string protocol_; std::string host_; int port_; + std::string path_; + std::string pathWithoutFile_; + std::string file_; + std::string parameter_; }; } // pulsar diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc index b1bfecba04fd0..02d05e6c48266 100644 --- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc +++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc @@ -37,7 +37,7 @@ DECLARE_LOG_OBJECT(); using namespace pulsar; -static std::string lookupUrl = "pulsar://localhost:8885"; +static std::string lookupUrl = "http://localhost:8765"; static std::string adminUrl = "http://localhost:8765/"; diff --git a/pulsar-client-cpp/tests/UrlTest.cc b/pulsar-client-cpp/tests/UrlTest.cc index 3cff28a12804b..a272e2a94bb70 100644 --- a/pulsar-client-cpp/tests/UrlTest.cc +++ b/pulsar-client-cpp/tests/UrlTest.cc @@ -62,4 +62,20 @@ TEST(UrlTest, testUrl) { ASSERT_EQ("example.com", url.host()); ASSERT_EQ("pulsar", url.protocol()); ASSERT_EQ(6650, url.port()); + + ASSERT_TRUE(Url::parse("http://env-broker3.messaging.cluster.company.com:4080/lookup/v2/destination/persistent/cmscpp/gq1/TESTNS.4/TOPIC_1490664894335_1?authoritative=false", url)); + ASSERT_EQ("http", url.protocol()); + ASSERT_EQ(4080, url.port()); + ASSERT_EQ("/lookup/v2/destination/persistent/cmscpp/gq1/TESTNS.4/TOPIC_1490664894335_1", url.path()); + ASSERT_EQ("/lookup/v2/destination/persistent/cmscpp/gq1/TESTNS.4/", url.pathWithoutFile()); + ASSERT_EQ("TOPIC_1490664894335_1", url.file()); + ASSERT_EQ("?authoritative=false", url.parameter()); + + ASSERT_TRUE(Url::parse("http://abc.com:8090/ads/ad/asd/TOPIC_1490664894335_1?authoritative=false,temp=true", url)); + ASSERT_EQ("http", url.protocol()); + ASSERT_EQ(8090, url.port()); + ASSERT_EQ("/ads/ad/asd/TOPIC_1490664894335_1", url.path()); + ASSERT_EQ("/ads/ad/asd/", url.pathWithoutFile()); + ASSERT_EQ("TOPIC_1490664894335_1", url.file()); + ASSERT_EQ("?authoritative=false,temp=true", url.parameter()); }