Skip to content

Commit

Permalink
Make envoy_esp talk to Mixer. (envoyproxy#29)
Browse files Browse the repository at this point in the history
* Make enovy_esp can talk to Mixer.

* Fix file format.

* Add mixer_options in server_config.

* Address comments.

* Update log info.

* Add TE to grpc headers.
  • Loading branch information
chowchow316 authored Jan 5, 2017
1 parent e941522 commit 4e25948
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 28 deletions.
1 change: 1 addition & 0 deletions contrib/endpoints/src/api_manager/context/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ cc_library(
"//contrib/endpoints/src/api_manager/auth",
"//contrib/endpoints/src/api_manager/auth:service_account_token",
"//contrib/endpoints/src/api_manager/cloud_trace",
"//contrib/endpoints/src/api_manager/mixer",
"//contrib/endpoints/src/api_manager/service_control",
"//contrib/endpoints/src/api_manager/utils",
"//external:cc_wkt_protos",
Expand Down
15 changes: 11 additions & 4 deletions contrib/endpoints/src/api_manager/context/service_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
////////////////////////////////////////////////////////////////////////////////
//
#include "contrib/endpoints/src/api_manager/context/service_context.h"
#include "contrib/endpoints/src/api_manager/mixer/mixer.h"

#include "contrib/endpoints/src/api_manager/service_control/aggregated.h"

Expand Down Expand Up @@ -85,10 +86,16 @@ const std::string& ServiceContext::project_id() const {
}

std::unique_ptr<service_control::Interface> ServiceContext::CreateInterface() {
return std::unique_ptr<service_control::Interface>(
service_control::Aggregated::Create(config_->service(),
config_->server_config(), env_.get(),
&service_account_token_));
if (config_->server_config() &&
config_->server_config()->has_mixer_options()) {
return std::unique_ptr<service_control::Interface>(
mixer::Mixer::Create(env_.get(), config_.get()));
} else {
return std::unique_ptr<service_control::Interface>(
service_control::Aggregated::Create(
config_->service(), config_->server_config(), env_.get(),
&service_account_token_));
}
}

std::unique_ptr<cloud_trace::Aggregator>
Expand Down
22 changes: 14 additions & 8 deletions contrib/endpoints/src/api_manager/mixer/mixer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ namespace api_manager {
namespace mixer {
namespace {

const char kMixerServiceName[] = "istio.mixer.v1.Mixer";

enum AttributeIndex {
ATTR_SERVICE_NAME = 0,
ATTR_PEER_ID,
Expand Down Expand Up @@ -133,8 +135,8 @@ void CovertToPb(const service_control::ReportRequestInfo& info,

} // namespace

Mixer::Mixer(ApiManagerEnvInterface* env, const std::string& service_name)
: env_(env), request_index_(0), service_name_(service_name) {}
Mixer::Mixer(ApiManagerEnvInterface* env, const Config* config)
: env_(env), request_index_(0), config_(config) {}

Mixer::~Mixer() {}

Expand Down Expand Up @@ -162,13 +164,15 @@ Status Mixer::Report(const service_control::ReportRequestInfo& info) {

::istio::mixer::v1::ReportRequest request;
request.set_request_index(++request_index_);
CovertToPb(info, service_name_, request.mutable_attribute_update());
CovertToPb(info, config_->service_name(), request.mutable_attribute_update());
env_->LogInfo(std::string("Send Report: ") + request.DebugString());

std::string request_body;
request.SerializeToString(&request_body);

grpc_request->set_server("mixer_server")
grpc_request
->set_server(config_->server_config()->mixer_options().mixer_server())
.set_service(kMixerServiceName)
.set_method("Report")
.set_body(request_body);

Expand Down Expand Up @@ -202,13 +206,15 @@ void Mixer::Check(

::istio::mixer::v1::CheckRequest request;
request.set_request_index(++request_index_);
CovertToPb(info, service_name_, request.mutable_attribute_update());
CovertToPb(info, config_->service_name(), request.mutable_attribute_update());
env_->LogInfo(std::string("Send Check: ") + request.DebugString());

std::string request_body;
request.SerializeToString(&request_body);

grpc_request->set_server("mixer_server")
grpc_request
->set_server(config_->server_config()->mixer_options().mixer_server())
.set_service(kMixerServiceName)
.set_method("Check")
.set_body(request_body);

Expand All @@ -220,8 +226,8 @@ Status Mixer::GetStatistics(service_control::Statistics* esp_stat) const {
}

service_control::Interface* Mixer::Create(ApiManagerEnvInterface* env,
const std::string& service_name) {
return new Mixer(env, service_name);
const Config* config) {
return new Mixer(env, config);
}

} // namespace mixer
Expand Down
8 changes: 4 additions & 4 deletions contrib/endpoints/src/api_manager/mixer/mixer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define API_MANAGER_MIXER_MIXER_H_

#include "contrib/endpoints/include/api_manager/env_interface.h"
#include "contrib/endpoints/src/api_manager/config.h"
#include "contrib/endpoints/src/api_manager/service_control/interface.h"

namespace google {
Expand All @@ -26,7 +27,7 @@ namespace mixer {
class Mixer : public service_control::Interface {
public:
static service_control::Interface* Create(ApiManagerEnvInterface* env,
const std::string& service_name);
const Config* config);

virtual ~Mixer();

Expand All @@ -46,14 +47,13 @@ class Mixer : public service_control::Interface {

private:
// The constructor.
Mixer(ApiManagerEnvInterface* env, const std::string& service_name);
Mixer(ApiManagerEnvInterface* env, const Config* config);

// The Api Manager environment interface.
ApiManagerEnvInterface* env_;
int64_t request_index_;

// The service name.
std::string service_name_;
const Config* config_;
};

} // namespace mixer
Expand Down
9 changes: 9 additions & 0 deletions contrib/endpoints/src/api_manager/proto/server_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ message ServerConfig {
// Server config used for API authentication
ApiAuthenticationConfig api_authentication_config = 5;

// Mixer option flag. If not present, default to use service_control. When
// Envoy/esp talks to Mixer, has to specify this field.
MixerOptions mixer_options = 6;

// Experimental flags
Experimental experimental = 999;
}
Expand Down Expand Up @@ -139,6 +143,11 @@ message ApiAuthenticationConfig {
bool force_disable = 1;
}

message MixerOptions {
// For envoy, it is the cluster name for mixer server.
string mixer_server = 1;
}

message Experimental {
// Disable timed printouts of ESP status to the error log.
bool disable_log_status = 1;
Expand Down
7 changes: 7 additions & 0 deletions contrib/endpoints/src/api_manager/server_config_proto_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ api_authentication_config {
experimental {
disable_log_status: false
}
mixer_options {
mixer_server: "mixer_server"
}
)";

TEST(ServerConfigProto, ServerConfigFromString) {
Expand Down Expand Up @@ -121,6 +125,9 @@ TEST(ServerConfigProto, ServerConfigFromString) {

// Check disable_log_status
EXPECT_EQ(false, server_config.experimental().disable_log_status());

// Check mixer options
EXPECT_EQ("mixer_server", server_config.mixer_options().mixer_server());
}

TEST(ServerConfigProto, ValidateSampleServerConfig) {
Expand Down
55 changes: 55 additions & 0 deletions src/envoy/prototype/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

This Proxy will use Envoy and talk to Mixer server.


## Build Mixer server

* Follow https://github.com/istio/mixer/blob/master/doc/devel/development.md to set up environment, and build via:

```
cd $(ISTIO)/mixer
bazel build ...:all
```

## Build Envoy proxy

* Build target envoy_esp:

```
bazel build //src/envoy/prototype:envoy_esp
```

## How to run it

* Start mixer server. In mixer folder run:

```
bazel-bin/cmd/server/mixs server
```

The server will run at port 9091

* Start backend Echo server. At ESP repo (https://github.com/cloudendpoints/esp)

```
cd test/echo
npm install
node echo.js
```

* Start Envoy proxy, run

```
bazel-bin/src/envoy/prototype/envoy_esp -c src/envoy/prototype/envoy-esp.conf
```

* Then issue HTTP request to proxy.

```
curl http://localhost:9090/echo?key=API-KEY -d "hello world"
```

## How to add attributes or facts

Now only some of attributes are passed to mixer. If you want to add more attributes, you can
modify this [file](https://gcp-apis.git.corp.google.com/esp/+/test/envoy-mixer/src/api_manager/mixer/mixer.cc).
85 changes: 78 additions & 7 deletions src/envoy/prototype/api_manager_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
#include "common/http/headers.h"
#include "common/http/message_impl.h"
#include "envoy/event/timer.h"
#include "google/protobuf/stubs/status.h"
#include "source/common/grpc/common.h"

using ::google::api_manager::utils::Status;
using ::google::protobuf::util::error::Code;

namespace Http {
namespace ApiManager {
Expand All @@ -13,7 +18,7 @@ void Http::ApiManager::Env::Log(LogLevel level, const char *message) {
log().debug("{}", message);
break;
case LogLevel::INFO:
log().debug("{}", message);
log().info("{}", message);
break;
case LogLevel::WARNING:
log().warn("{}", message);
Expand Down Expand Up @@ -60,6 +65,8 @@ std::unique_ptr<google::api_manager::PeriodicTimer> Env::StartPeriodicTimer(
}

static const LowerCaseString kApiManagerUrl("x-api-manager-url");
static const LowerCaseString kGrpcTEKey("te");
static const std::string kGrpcTEValue("trailers");

class HTTPRequest : public Http::Message {
private:
Expand Down Expand Up @@ -99,13 +106,14 @@ class HTTPRequest : public Http::Message {
virtual std::string bodyAsString() override { return ""; }
};

class RequestCallbacks : public AsyncClient::Callbacks {
class HTTPRequestCallbacks : public AsyncClient::Callbacks {
private:
std::unique_ptr<google::api_manager::HTTPRequest> request_;
std::unique_ptr<AsyncClient::Request> sent_request_;

public:
RequestCallbacks(std::unique_ptr<google::api_manager::HTTPRequest> &&request)
HTTPRequestCallbacks(
std::unique_ptr<google::api_manager::HTTPRequest> &&request)
: request_(std::move(request)) {}
virtual void onSuccess(MessagePtr &&response) override {
google::api_manager::utils::Status status(
Expand All @@ -121,28 +129,91 @@ class RequestCallbacks : public AsyncClient::Callbacks {
delete this;
}
virtual void onFailure(AsyncClient::FailureReason reason) override {
google::api_manager::utils::Status status =
google::api_manager::utils::Status::OK;
google::api_manager::utils::Status status(-1,
"Cannot connect to HTTP server.");
std::map<std::string, std::string> headers;
request_->OnComplete(status, std::move(headers), "");
delete this;
}
};

namespace {
// Copy the code here from envoy/grpc/common.cc
Buffer::InstancePtr SerializeGrpcBody(const std::string &body_str) {
// http://www.grpc.io/docs/guides/wire.html
Buffer::InstancePtr body(new Buffer::OwnedImpl());
uint8_t compressed = 0;
body->add(&compressed, sizeof(compressed));
uint32_t size = htonl(body_str.size());
body->add(&size, sizeof(size));
body->add(body_str);
return body;
}
Http::MessagePtr PrepareGrpcHeaders(const std::string &upstream_cluster,
const std::string &service_full_name,
const std::string &method_name) {
Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().insertMethod().value(
Http::Headers::get().MethodValues.Post);
message->headers().insertPath().value(
fmt::format("/{}/{}", service_full_name, method_name));
message->headers().insertHost().value(upstream_cluster);
message->headers().insertContentType().value(Grpc::Common::GRPC_CONTENT_TYPE);
message->headers().addStatic(kGrpcTEKey, kGrpcTEValue);
return message;
}
} // annoymous namespace

class GrpcRequestCallbacks : public AsyncClient::Callbacks {
private:
Env *env_;
std::unique_ptr<google::api_manager::GRPCRequest> request_;

public:
GrpcRequestCallbacks(
Env *env, std::unique_ptr<google::api_manager::GRPCRequest> &&request)
: env_(env), request_(std::move(request)) {}
virtual void onSuccess(MessagePtr &&response) override {
google::api_manager::utils::Status status(
std::stoi(response->headers().Status()->value().c_str()), "");
Grpc::Common::validateResponse(*response);
env_->LogInfo("pass validate");
// remove 5 bytes of grpc header
response->body()->drain(5);
request_->OnComplete(status, response->bodyAsString());
delete this;
}
virtual void onFailure(AsyncClient::FailureReason reason) override {
google::api_manager::utils::Status status(-1,
"Cannot connect to gRPC server.");
request_->OnComplete(status, "");
delete this;
}
};

void Env::RunHTTPRequest(
std::unique_ptr<google::api_manager::HTTPRequest> request) {
auto &client = cm_.httpAsyncClientForCluster("api_manager");

MessagePtr message{new HTTPRequest(request.get())};
RequestCallbacks *callbacks = new RequestCallbacks(std::move(request));
HTTPRequestCallbacks *callbacks =
new HTTPRequestCallbacks(std::move(request));
client.send(
std::move(message), *callbacks,
Optional<std::chrono::milliseconds>(std::chrono::milliseconds(10000)));
}

void Env::RunGRPCRequest(
std::unique_ptr<google::api_manager::GRPCRequest> request) {
// TODO: send grpc request.
auto &client = cm_.httpAsyncClientForCluster(request->server());

Http::MessagePtr message =
PrepareGrpcHeaders("localhost", request->service(), request->method());
message->body(SerializeGrpcBody(request->body()));
auto callbacks = new GrpcRequestCallbacks(this, std::move(request));
client.send(
std::move(message), *callbacks,
Optional<std::chrono::milliseconds>(std::chrono::milliseconds(10000)));
}
}
}
Loading

0 comments on commit 4e25948

Please sign in to comment.