Skip to content

Commit

Permalink
ARROW-5875: [FlightRPC] integration tests for Flight features
Browse files Browse the repository at this point in the history
This is only a minimal test (currently the basic auth protobuf). I have more tests, but they require other fixes, so I'd rather get the basic framework down and then add to it, over creating an enormous review.

Closes #6617 from lidavidm/flight-integration-minimal

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
lidavidm authored and wesm committed May 8, 2020
1 parent 003d30f commit 0188e45
Show file tree
Hide file tree
Showing 17 changed files with 719 additions and 88 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/flight/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS OR ARROW_BUILD_INTEGRATION)
OUTPUTS
ARROW_FLIGHT_TESTING_LIBRARIES
SOURCES
test_integration.cc
test_util.cc
DEPENDENCIES
GTest::gtest
Expand Down
115 changes: 115 additions & 0 deletions cpp/src/arrow/flight/test_integration.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/flight/test_integration.h"
#include "arrow/flight/client_middleware.h"
#include "arrow/flight/server_middleware.h"
#include "arrow/flight/test_util.h"
#include "arrow/flight/types.h"
#include "arrow/ipc/dictionary.h"

#include <iostream>
#include <memory>
#include <string>
#include <utility>
#include <vector>

namespace arrow {
namespace flight {

/// \brief The server for the basic auth integration test.
class AuthBasicProtoServer : public FlightServerBase {
Status DoAction(const ServerCallContext& context, const Action& action,
std::unique_ptr<ResultStream>* result) override {
// Respond with the authenticated username.
auto buf = Buffer::FromString(context.peer_identity());
*result = std::unique_ptr<ResultStream>(new SimpleResultStream({Result{buf}}));
return Status::OK();
}
};

/// Validate the result of a DoAction.
Status CheckActionResults(FlightClient* client, const Action& action,
std::vector<std::string> results) {
std::unique_ptr<ResultStream> stream;
RETURN_NOT_OK(client->DoAction(action, &stream));
std::unique_ptr<Result> result;
for (const std::string& expected : results) {
RETURN_NOT_OK(stream->Next(&result));
if (!result) {
return Status::Invalid("Action result stream ended early");
}
const auto actual = result->body->ToString();
if (expected != actual) {
return Status::Invalid("Got wrong result; expected", expected, "but got", actual);
}
}
RETURN_NOT_OK(stream->Next(&result));
if (result) {
return Status::Invalid("Action result stream had too many entries");
}
return Status::OK();
}

// The expected username for the basic auth integration test.
constexpr auto kAuthUsername = "arrow";
// The expected password for the basic auth integration test.
constexpr auto kAuthPassword = "flight";

/// \brief A scenario testing the basic auth protobuf.
class AuthBasicProtoScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
server->reset(new AuthBasicProtoServer());
options->auth_handler =
std::make_shared<TestServerBasicAuthHandler>(kAuthUsername, kAuthPassword);
return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }

Status RunClient(std::unique_ptr<FlightClient> client) override {
Action action;
std::unique_ptr<ResultStream> stream;
std::shared_ptr<FlightStatusDetail> detail;
const auto& status = client->DoAction(action, &stream);
detail = FlightStatusDetail::UnwrapStatus(status);
// This client is unauthenticated and should fail.
if (detail == nullptr) {
return Status::Invalid("Expected UNAUTHENTICATED but got ", status.ToString());
}
if (detail->code() != FlightStatusCode::Unauthenticated) {
return Status::Invalid("Expected UNAUTHENTICATED but got ", detail->ToString());
}

auto client_handler = std::unique_ptr<ClientAuthHandler>(
new TestClientBasicAuthHandler(kAuthUsername, kAuthPassword));
RETURN_NOT_OK(client->Authenticate({}, std::move(client_handler)));
return CheckActionResults(client.get(), action, {kAuthUsername});
}
};

Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>* out) {
if (scenario_name == "auth:basic_proto") {
*out = std::make_shared<AuthBasicProtoScenario>();
return Status::OK();
}
return Status::KeyError("Scenario not found: ", scenario_name);
}

} // namespace flight
} // namespace arrow
49 changes: 49 additions & 0 deletions cpp/src/arrow/flight/test_integration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Integration test scenarios for Arrow Flight.

#include "arrow/flight/visibility.h"

#include <memory>
#include <string>

#include "arrow/flight/client.h"
#include "arrow/flight/server.h"
#include "arrow/status.h"

namespace arrow {
namespace flight {

/// \brief An integration test for Arrow Flight.
class ARROW_FLIGHT_EXPORT Scenario {
public:
virtual ~Scenario() = default;
/// \brief Set up the server.
virtual Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) = 0;
/// \brief Set up the client.
virtual Status MakeClient(FlightClientOptions* options) = 0;
/// \brief Run the scenario as the client.
virtual Status RunClient(std::unique_ptr<FlightClient> client) = 0;
};

/// \brief Get the implementation of an integration test scenario by name.
Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>* out);

} // namespace flight
} // namespace arrow
129 changes: 78 additions & 51 deletions cpp/src/arrow/flight/test_integration_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
#include "arrow/util/logging.h"

#include "arrow/flight/api.h"
#include "arrow/flight/test_integration.h"
#include "arrow/flight/test_util.h"

DEFINE_string(host, "localhost", "Server port to connect to");
DEFINE_int32(port, 31337, "Server port to connect to");
DEFINE_string(path, "", "Resource path to request");
DEFINE_string(scenario, "", "Integration test scenario to run");

namespace arrow {
namespace flight {
Expand Down Expand Up @@ -126,69 +128,94 @@ Status ConsumeFlightLocation(
return Status::OK();
}

int RunIntegrationClient() {
// Make sure the required extension types are registered.
ExtensionTypeGuard uuid_ext_guard(uuid());
ExtensionTypeGuard dict_ext_guard(dict_extension_type());

std::unique_ptr<FlightClient> client;
Location location;
ABORT_NOT_OK(Location::ForGrpcTcp(FLAGS_host, FLAGS_port, &location));
ABORT_NOT_OK(FlightClient::Connect(location, &client));

FlightDescriptor descr{FlightDescriptor::PATH, "", {FLAGS_path}};

// 1. Put the data to the server.
std::unique_ptr<ipc::internal::json::JsonReader> reader;
std::cout << "Opening JSON file '" << FLAGS_path << "'" << std::endl;
auto in_file = *io::ReadableFile::Open(FLAGS_path);
ABORT_NOT_OK(
ipc::internal::json::JsonReader::Open(default_memory_pool(), in_file, &reader));

std::shared_ptr<Schema> original_schema = reader->schema();
std::vector<std::shared_ptr<RecordBatch>> original_data;
ABORT_NOT_OK(ReadBatches(reader, &original_data));

std::unique_ptr<FlightStreamWriter> write_stream;
std::unique_ptr<FlightMetadataReader> metadata_reader;
ABORT_NOT_OK(client->DoPut(descr, original_schema, &write_stream, &metadata_reader));
ABORT_NOT_OK(UploadBatchesToFlight(original_data, *write_stream, *metadata_reader));

// 2. Get the ticket for the data.
std::unique_ptr<FlightInfo> info;
ABORT_NOT_OK(client->GetFlightInfo(descr, &info));

std::shared_ptr<Schema> schema;
ipc::DictionaryMemo dict_memo;
ABORT_NOT_OK(info->GetSchema(&dict_memo, &schema));

if (info->endpoints().size() == 0) {
std::cerr << "No endpoints returned from Flight server." << std::endl;
return -1;
class IntegrationTestScenario : public flight::Scenario {
public:
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
ARROW_UNUSED(server);
ARROW_UNUSED(options);
return Status::NotImplemented("Not implemented, see test_integration_server.cc");
}

for (const FlightEndpoint& endpoint : info->endpoints()) {
const auto& ticket = endpoint.ticket;
Status MakeClient(FlightClientOptions* options) override {
ARROW_UNUSED(options);
return Status::OK();
}

Status RunClient(std::unique_ptr<FlightClient> client) override {
// Make sure the required extension types are registered.
ExtensionTypeGuard uuid_ext_guard(uuid());
ExtensionTypeGuard dict_ext_guard(dict_extension_type());

FlightDescriptor descr{FlightDescriptor::PATH, "", {FLAGS_path}};

// 1. Put the data to the server.
std::unique_ptr<ipc::internal::json::JsonReader> reader;
std::cout << "Opening JSON file '" << FLAGS_path << "'" << std::endl;
auto in_file = *io::ReadableFile::Open(FLAGS_path);
ABORT_NOT_OK(
ipc::internal::json::JsonReader::Open(default_memory_pool(), in_file, &reader));

std::shared_ptr<Schema> original_schema = reader->schema();
std::vector<std::shared_ptr<RecordBatch>> original_data;
ABORT_NOT_OK(ReadBatches(reader, &original_data));

auto locations = endpoint.locations;
if (locations.size() == 0) {
locations = {location};
std::unique_ptr<FlightStreamWriter> write_stream;
std::unique_ptr<FlightMetadataReader> metadata_reader;
ABORT_NOT_OK(client->DoPut(descr, original_schema, &write_stream, &metadata_reader));
ABORT_NOT_OK(UploadBatchesToFlight(original_data, *write_stream, *metadata_reader));

// 2. Get the ticket for the data.
std::unique_ptr<FlightInfo> info;
ABORT_NOT_OK(client->GetFlightInfo(descr, &info));

std::shared_ptr<Schema> schema;
ipc::DictionaryMemo dict_memo;
ABORT_NOT_OK(info->GetSchema(&dict_memo, &schema));

if (info->endpoints().size() == 0) {
std::cerr << "No endpoints returned from Flight server." << std::endl;
return Status::IOError("No endpoints returned from Flight server.");
}

for (const auto& location : locations) {
std::cout << "Verifying location " << location.ToString() << std::endl;
// 3. Stream data from the server, comparing individual batches.
ABORT_NOT_OK(ConsumeFlightLocation(location, ticket, original_data));
for (const FlightEndpoint& endpoint : info->endpoints()) {
const auto& ticket = endpoint.ticket;

auto locations = endpoint.locations;
if (locations.size() == 0) {
return Status::IOError("No locations returned from Flight server.");
}

for (const auto& location : locations) {
std::cout << "Verifying location " << location.ToString() << std::endl;
// 3. Stream data from the server, comparing individual batches.
ABORT_NOT_OK(ConsumeFlightLocation(location, ticket, original_data));
}
}
return Status::OK();
}
return 0;
}
};

} // namespace flight
} // namespace arrow

int main(int argc, char** argv) {
gflags::SetUsageMessage("Integration testing client for Flight.");
gflags::ParseCommandLineFlags(&argc, &argv, true);
return arrow::flight::RunIntegrationClient();
std::shared_ptr<arrow::flight::Scenario> scenario;
if (!FLAGS_scenario.empty()) {
ARROW_CHECK_OK(arrow::flight::GetScenario(FLAGS_scenario, &scenario));
} else {
scenario = std::make_shared<arrow::flight::IntegrationTestScenario>();
}

arrow::flight::FlightClientOptions options;
std::unique_ptr<arrow::flight::FlightClient> client;

ABORT_NOT_OK(scenario->MakeClient(&options));

arrow::flight::Location location;
ABORT_NOT_OK(arrow::flight::Location::ForGrpcTcp(FLAGS_host, FLAGS_port, &location));
ABORT_NOT_OK(arrow::flight::FlightClient::Connect(location, options, &client));
return 0;
}
Loading

0 comments on commit 0188e45

Please sign in to comment.