Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP lookups for c++ client lib #317

Merged
merged 37 commits into from
Apr 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f614e3c
Blocking libcurl
Mar 23, 2017
2e8eed9
SSwitched to asio
Mar 24, 2017
ad97561
Working
Mar 24, 2017
c141f8b
Working so far
Mar 25, 2017
cb716b3
Not working
Mar 27, 2017
e749680
Base working now
Mar 27, 2017
5a4183c
Redirect left
Mar 27, 2017
14e0b89
Redirection remains
Mar 27, 2017
dc5b2a6
Added Removed mentions of my name
Mar 27, 2017
1943566
Url extension
Mar 28, 2017
980e211
Merge branch 'master' of https://github.com/yahoo/pulsar into httplookup
Mar 28, 2017
a6db72d
redirects handled
Mar 28, 2017
846ea50
Corrections in redirects
Mar 28, 2017
26a9aa3
Fixed socket resetting logic
Mar 28, 2017
9c82572
Handled carriage
Mar 28, 2017
1e59e49
added more error handling
Mar 28, 2017
542725f
added more error handling
Mar 28, 2017
7a3c76a
added more error handling
Mar 28, 2017
e60aaf2
added more error handling
Mar 28, 2017
1897790
Addressed Maurice's comments
Mar 30, 2017
8d1978a
using libcurl
Apr 3, 2017
12dc686
Removed HTTPWrapper files
Apr 3, 2017
b895c65
Removed mentions of my name
Apr 3, 2017
8926955
Compilation errors resolved
Apr 3, 2017
b8684aa
Formatting issues
Apr 3, 2017
1085bd8
Fixed JSON parsing logic since find API is not present in linux JSON …
Apr 3, 2017
cabffde
CR Comments and corrected header logic
Apr 3, 2017
6c0b24b
Corrected Auth part
Apr 3, 2017
5376cd5
Corrected Auth part
Apr 3, 2017
455e7f1
changed this to shared_from_this()
Apr 4, 2017
e94fe7e
Switching on CURLOPT_NOSIGNAL
Apr 4, 2017
40a4d33
Handled Matteo's Code Review Comments
Apr 5, 2017
631334e
Merge branch 'master' of https://github.com/yahoo/pulsar into httplookup
Apr 5, 2017
764efff
Fixed compilation error
Apr 5, 2017
6c7e011
Made curl initialization happen once
Apr 5, 2017
18fe553
Rearranged the class
Apr 5, 2017
2d0d9fc
Addresses Andrew's CR comments
Apr 5, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pulsar-client-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ find_library(LOG4CXX_LIBRARY_PATH log4cxx)
find_library(CURL_LIBRARY_PATH curl)
find_path(LOG4CXX_INCLUDE_PATH log4cxx/logger.h)
find_path(GTEST_INCLUDE_PATH gtest/gtest.h)
find_library(LIB_JSON jsoncpp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also update README with the dependency list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if (NOT LIB_JSON)
find_library(LIB_JSON json_cpp)
endif (NOT LIB_JSON)

set(ADDITIONAL_LIBRARIES $ENV{PULSAR_ADDITIONAL_LIBRARIES})
link_directories( $ENV{PULSAR_ADDITIONAL_LIBRARY_PATH} )
Expand All @@ -56,6 +61,7 @@ set(COMMON_LIBS
${LOG4CXX_LIBRARY_PATH}
${CURL_LIBRARY_PATH}
${ADDITIONAL_LIBRARIES}
${LIB_JSON}
)

link_directories(${CMAKE_BINARY_DIR}/lib)
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ https://github.com/yahoo/pulsar/tree/master/pulsar-client-cpp/examples
* [Log4CXX](https://logging.apache.org/log4cxx)
* LibCurl
* [GTest](https://github.com/google/googletest)

* JsonCpp
## Platforms

Pulsar C++ Client Library has been tested on:
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-cpp/lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "DestinationName.h"
#include "BinaryProtoLookupService.h"
#include "LogUtils.h"
#include "SharedBuffer.h"

#include <boost/shared_ptr.hpp>
Expand Down
12 changes: 3 additions & 9 deletions pulsar-client-cpp/lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@
#define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_

#include <iostream>

#include <boost/shared_ptr.hpp>
#include <pulsar/Authentication.h>
#include <pulsar/Result.h>
#include "ConnectionPool.h"
#include "DestinationName.h"
#include "Future.h"
#include "LookupDataResult.h"
#include "Backoff.h"

#include <lib/LookupService.h>
#pragma GCC visibility push(default)

namespace pulsar {
class LookupDataResult;

class BinaryProtoLookupService {
class BinaryProtoLookupService : public LookupService {
public:
/*
* constructor
Expand Down Expand Up @@ -70,7 +64,7 @@ class BinaryProtoLookupService {
uint64_t newRequestId();

};

typedef boost::shared_ptr<BinaryProtoLookupService> BinaryProtoLookupServicePtr;
}

#pragma GCC visibility pop
Expand Down
18 changes: 14 additions & 4 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <sstream>
#include <openssl/sha.h>
#include "boost/date_time/posix_time/posix_time.hpp"
#include <lib/HTTPLookupService.h>

DECLARE_LOG_OBJECT()

Expand Down Expand Up @@ -63,11 +64,20 @@ namespace pulsar {
listenerExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getMessageListenerThreads())),
partitionListenerExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getMessageListenerThreads())),
pool_(clientConfiguration, ioExecutorProvider_, clientConfiguration.getAuthenticationPtr(), poolConnections),
lookup_(pool_, serviceUrl),
producerIdGenerator_(0),
consumerIdGenerator_(0),
requestIdGenerator_(0) {
LogUtils::init(clientConfiguration.getLogConfFilePath());
if (serviceUrl_.compare(0, 4, "http") == 0) {
LOG_DEBUG("Using HTTP Lookup");
lookupServicePtr_ = boost::make_shared<HTTPLookupService>(boost::cref(serviceUrl_),
boost::cref(clientConfiguration_),
boost::cref(
clientConfiguration.getAuthenticationPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
lookupServicePtr_ = boost::make_shared<BinaryProtoLookupService>(boost::ref(pool_), boost::ref(serviceUrl));
}
}

ClientImpl::~ClientImpl() {
Expand Down Expand Up @@ -105,7 +115,7 @@ namespace pulsar {
return;
}
}
lookup_.getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleCreateProducer,
lookupServicePtr_->getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleCreateProducer,
shared_from_this(), _1, _2, dn, conf, callback));
}

Expand Down Expand Up @@ -157,7 +167,7 @@ namespace pulsar {
}
}

lookup_.getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleSubscribe,
lookupServicePtr_->getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleSubscribe,
shared_from_this(), _1, _2, dn, consumerName, conf, callback));
}

Expand Down Expand Up @@ -211,7 +221,7 @@ namespace pulsar {

Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) {
Promise<Result, ClientConnectionWeakPtr> promise;
lookup_.lookupAsync(topic).addListener(boost::bind(&ClientImpl::handleLookup, this, _1, _2, promise));
lookupServicePtr_->lookupAsync(topic).addListener(boost::bind(&ClientImpl::handleLookup, this, _1, _2, promise));
return promise.getFuture();
}

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> {
ExecutorServiceProviderPtr listenerExecutorProvider_;
ExecutorServiceProviderPtr partitionListenerExecutorProvider_;

BinaryProtoLookupService lookup_;
LookupServicePtr lookupServicePtr_;
ConnectionPool pool_;

uint64_t producerIdGenerator_;
Expand Down
200 changes: 200 additions & 0 deletions pulsar-client-cpp/lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <lib/HTTPLookupService.h>

DECLARE_LOG_OBJECT()

namespace pulsar {
const static std::string V2_PATH = "/lookup/v2/destination/";
const static std::string PARTITION_PATH = "/admin/persistent/";
const static int MAX_HTTP_REDIRECTS = 20;
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;

HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer;

HTTPLookupService::HTTPLookupService(const std::string &lookupUrl,
const ClientConfiguration &clientConfiguration,
const AuthenticationPtr &authData)
: executorProvider_(boost::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
authenticationPtr_(authData),
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()) {
if (lookupUrl[lookupUrl.length() - 1] == '/') {
// Remove trailing '/'
adminUrl_ = lookupUrl.substr(0, lookupUrl.length() - 1);
} else {
adminUrl_ = lookupUrl;
}
}

Future<Result, LookupDataResultPtr> HTTPLookupService::lookupAsync(const std::string &destinationName) {
LookupPromise promise;
boost::shared_ptr<DestinationName> dn = DestinationName::get(destinationName);
if (!dn) {
LOG_ERROR("Unable to parse destination - " << destinationName);
promise.setFailed(ResultInvalidTopicName);
return promise.getFuture();
}

std::stringstream completeUrlStream;
completeUrlStream << adminUrl_ << V2_PATH << "persistent/" << dn->getProperty() << '/' << dn->getCluster()
<< '/' << dn->getNamespacePortion() << '/' << dn->getEncodedLocalName();
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), Lookup));
return promise.getFuture();
}

Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync(const DestinationNamePtr &dn) {
LookupPromise promise;
std::stringstream completeUrlStream;
completeUrlStream << adminUrl_ << PARTITION_PATH << dn->getProperty() << '/' << dn->getCluster()
<< '/' << dn->getNamespacePortion() << '/' << dn->getEncodedLocalName() << '/'
<< PARTITION_METHOD_NAME;
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), PartitionMetaData));
return promise.getFuture();
}

static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void *responseDataPtr) {
((std::string*)responseDataPtr)->append((char*)contents, size * nmemb);
return size * nmemb;
}

void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string completeUrl,
RequestType requestType) {
CURL *handle;
CURLcode res;
std::string responseData;

handle = curl_easy_init();

if(!handle) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
promise.setFailed(ResultLookupError);
// No curl_easy_cleanup required since handle not initialized
return;
}
// set URL
curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str());

// Write callback
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData);

// New connection is made for each call
curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L);
curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L);

// Skipping signal handling - results in timeouts not honored during the DNS lookup
curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L);

// Timer
curl_easy_setopt(handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_);

// Redirects
curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(handle, CURLOPT_MAXREDIRS, MAX_HTTP_REDIRECTS);

// Fail if HTTP return code >=400
curl_easy_setopt(handle, CURLOPT_FAILONERROR, 1L);

// Authorization data
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR("All Authentication methods should have AuthenticationData and return true on getAuthData for url " << completeUrl);
promise.setFailed(authResult);
curl_easy_cleanup(handle);
return;
}
struct curl_slist *list = NULL;
if (authDataContent->hasDataForHttp()) {
list = curl_slist_append(list, authDataContent->getHttpHeaders().c_str());
}
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);

// Make get call to server
res = curl_easy_perform(handle);

// Free header list
curl_slist_free_all(list);

switch(res) {
case CURLE_OK:
LOG_DEBUG("Response received successfully for url " << completeUrl);
promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) : parseLookupData(responseData));
break;
case CURLE_COULDNT_CONNECT:
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultConnectError);
break;
case CURLE_READ_ERROR:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultReadError);
break;
case CURLE_OPERATION_TIMEDOUT:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultTimeout);
break;
default:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultLookupError);
break;
}
curl_easy_cleanup(handle);
}

LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) {
Json::Value root;
Json::Reader reader;
if (!reader.parse(json, root, false)) {
LOG_ERROR("Failed to parse json of Partition Metadata: " << reader.getFormatedErrorMessages()
<< "\nInput Json = " << json);
return LookupDataResultPtr();
}
LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>();
lookupDataResultPtr->setPartitions(root.get("partitions", 0).asInt());
return lookupDataResultPtr;
}

LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) {
Json::Value root;
Json::Reader reader;
if (!reader.parse(json, root, false)) {
LOG_ERROR("Failed to parse json : " << reader.getFormatedErrorMessages()
<< "\nInput Json = " << json);
return LookupDataResultPtr();
}
const std::string defaultNotFoundString = "Url Not found";
const std::string brokerUrl = root.get("brokerUrl", defaultNotFoundString).asString();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used get instead of find since the json_cpp version in yahoo env doesn't support the find API.

if (brokerUrl == defaultNotFoundString) {
LOG_ERROR("malformed json! - brokerUrl not present" << json);
return LookupDataResultPtr();
}

const std::string brokerUrlSsl = root.get("brokerUrlSsl", defaultNotFoundString).asString();
if (brokerUrlSsl == defaultNotFoundString) {
LOG_ERROR("malformed json! - brokerUrlSsl not present" << json);
return LookupDataResultPtr();
}

LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>();
lookupDataResultPtr->setBrokerUrl(brokerUrl);
lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl);
return lookupDataResultPtr;
}
}
61 changes: 61 additions & 0 deletions pulsar-client-cpp/lib/HTTPLookupService.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef PULSAR_CPP_HTTPLOOKUPSERVICE_H
#define PULSAR_CPP_HTTPLOOKUPSERVICE_H

#include <lib/LookupService.h>
#include <lib/ClientImpl.h>
#include <lib/Url.h>
#include <json/value.h>
#include <json/reader.h>
#include <boost/bind.hpp>
#include <curl/curl.h>

namespace pulsar {
class HTTPLookupService : public LookupService, public boost::enable_shared_from_this<HTTPLookupService> {
class CurlInitializer {
public:
CurlInitializer() {
// Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html
curl_global_init (CURL_GLOBAL_ALL);
}
~CurlInitializer() {
curl_global_cleanup();
}
};
static CurlInitializer curlInitializer;
enum RequestType {Lookup, PartitionMetaData};
typedef Promise<Result, LookupDataResultPtr> LookupPromise;
ExecutorServiceProviderPtr executorProvider_;
std::string adminUrl_;
AuthenticationPtr authenticationPtr_;
int lookupTimeoutInSeconds_;

static LookupDataResultPtr parsePartitionData(const std::string&);
static LookupDataResultPtr parseLookupData(const std::string&);
void sendHTTPRequest(LookupPromise, const std::string, RequestType);
public:
HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&);

Future<Result, LookupDataResultPtr> lookupAsync(const std::string&);

Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const DestinationNamePtr&);
};

}

#endif //PULSAR_CPP_HTTPLOOKUPSERVICE_H
Loading