Skip to content

Commit

Permalink
[fuzzer] Eliminate use of mock_endpoint in server fuzzer (grpc#35625)
Browse files Browse the repository at this point in the history
Instead, connect a real client, which gives us the ability to exercise the listener portion of the code too.

Closes grpc#35625

COPYBARA_INTEGRATE_REVIEW=grpc#35625 from ctiller:shush-connect b8f7899
PiperOrigin-RevId: 600836565
  • Loading branch information
ctiller authored and copybara-github committed Jan 23, 2024
1 parent af95eaf commit 19f1b39
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 41 deletions.
4 changes: 4 additions & 0 deletions include/grpc/support/port_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -671,11 +671,15 @@ typedef unsigned __int64 uint64_t;
#endif

#ifndef GRPC_REINITIALIZES
#if defined(__clang__)
#if GPR_HAS_CPP_ATTRIBUTE(clang::reinitializes)
#define GRPC_REINITIALIZES [[clang::reinitializes]]
#else
#define GRPC_REINITIALIZES
#endif
#else
#define GRPC_REINITIALIZES
#endif
#endif

#ifndef GPR_HAS_ATTRIBUTE
Expand Down
2 changes: 2 additions & 0 deletions src/core/lib/iomgr/tcp_server_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ static grpc_error_handle CreateEventEngineListener(
} else {
EventEngine::Listener::AcceptCallback accept_cb =
[s](std::unique_ptr<EventEngine::Endpoint> ep, MemoryAllocator) {
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
s->on_accept_cb(s->on_accept_cb_arg,
grpc_event_engine::experimental::
grpc_event_engine_endpoint_create(std::move(ep)),
Expand Down
2 changes: 1 addition & 1 deletion test/core/end2end/fuzzers/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ grpc_cc_library(
"fuzzer_input_proto",
"//:chttp2_frame",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/util:fuzzing_channel_args",
"//test/core/util:grpc_test_util_base",
],
)
Expand Down Expand Up @@ -136,7 +137,6 @@ grpc_proto_fuzzer(
"//:grpc",
"//src/core:channel_args",
"//test/core/util:fuzz_config_vars",
"//test/core/util:fuzzing_channel_args",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
],
Expand Down
3 changes: 3 additions & 0 deletions test/core/end2end/fuzzers/fuzzer_input.proto
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ message InputSegments {
}

message NetworkInput {
int32 connect_delay_ms = 3;
int32 connect_timeout_ms = 4;
grpc.testing.FuzzingChannelArgs endpoint_config = 5;
oneof value {
bytes single_read_bytes = 1;
InputSegments input_segments = 2;
Expand Down
145 changes: 145 additions & 0 deletions test/core/end2end/fuzzers/network_input.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,20 @@

#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
#include "test/core/util/mock_endpoint.h"

using grpc_event_engine::experimental::EventEngine;

namespace grpc_core {

namespace {
Expand Down Expand Up @@ -307,4 +314,142 @@ Duration ScheduleReads(
});
return Duration::Milliseconds(delay + 2);
}

namespace {

void ReadForever(std::shared_ptr<EventEngine::Endpoint> ep) {
bool finished;
do {
auto buffer =
std::make_unique<grpc_event_engine::experimental::SliceBuffer>();
auto buffer_ptr = buffer.get();
finished = ep->Read(
[ep, buffer = std::move(buffer)](absl::Status status) mutable {
ExecCtx exec_ctx;
if (!status.ok()) return;
ReadForever(std::move(ep));
},
buffer_ptr, nullptr);
} while (finished);
}

void ScheduleWritesForReads(
std::shared_ptr<EventEngine::Endpoint> ep,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
std::vector<QueuedRead> schedule) {
class Scheduler {
public:
Scheduler(std::shared_ptr<EventEngine::Endpoint> ep,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
std::vector<QueuedRead> schedule)
: ep_(std::move(ep)),
event_engine_(event_engine),
schedule_(std::move(schedule)),
it_(schedule_.begin()) {
ScheduleNext();
}

private:
void ScheduleNext() {
if (it_ == schedule_.end()) {
delete this;
return;
}
event_engine_->RunAfterExactly(
Duration::Milliseconds(it_->delay_ms - delay_consumed_),
[this]() mutable {
ExecCtx exec_ctx;
delay_consumed_ = it_->delay_ms;
writing_.Clear();
writing_.Append(
grpc_event_engine::experimental::internal::SliceCast<
grpc_event_engine::experimental::Slice>(
it_->slices.JoinIntoSlice()));
if (ep_->Write(
[this](absl::Status status) {
ExecCtx exec_ctx;
FinishWrite(std::move(status));
},
&writing_, nullptr)) {
FinishWrite(absl::OkStatus());
}
});
}

void FinishWrite(absl::Status status) {
if (!status.ok()) {
it_ = schedule_.end();
} else {
++it_;
}
ScheduleNext();
}

std::shared_ptr<EventEngine::Endpoint> ep_;
grpc_event_engine::experimental::FuzzingEventEngine* event_engine_;
std::vector<QueuedRead> schedule_;
std::vector<QueuedRead>::iterator it_;
grpc_event_engine::experimental::SliceBuffer writing_;
int delay_consumed_ = 0;
};
new Scheduler(std::move(ep), event_engine, std::move(schedule));
}

} // namespace

Duration ScheduleConnection(
const fuzzer_input::NetworkInput& network_input,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
testing::FuzzingEnvironment environment, int port) {
ChannelArgs channel_args =
CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(
CreateChannelArgsFromFuzzingConfiguration(
network_input.endpoint_config(), environment)
.ToC()
.get());
auto schedule = MakeSchedule(network_input);
Duration delay = Duration::Zero();
for (const auto& q : schedule) {
delay = std::max(
delay,
Duration::Milliseconds(q.delay_ms) +
Duration::NanosecondsRoundUp(
(q.slices.Length() * event_engine->max_delay_write()).count()));
}
delay += Duration::Milliseconds(network_input.connect_delay_ms() +
network_input.connect_timeout_ms());
event_engine->RunAfterExactly(
Duration::Milliseconds(network_input.connect_delay_ms()),
[event_engine, channel_args,
connect_timeout_ms = network_input.connect_timeout_ms(),
schedule = std::move(schedule), port]() mutable {
event_engine->Connect(
[event_engine, schedule = std::move(schedule)](
absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>>
endpoint) mutable {
ExecCtx exec_ctx;
if (!endpoint.ok()) {
gpr_log(GPR_ERROR, "Failed to connect: %s",
endpoint.status().ToString().c_str());
return;
}
std::shared_ptr<EventEngine::Endpoint> ep =
std::move(endpoint.value());
ReadForever(ep);
ScheduleWritesForReads(std::move(ep), event_engine,
std::move(schedule));
},
grpc_event_engine::experimental::ResolvedAddressMakeWild4(port),
grpc_event_engine::experimental::ChannelArgsEndpointConfig(
channel_args),
channel_args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryAllocator("fuzzer"),
Duration::Milliseconds(connect_timeout_ms));
});
return delay;
}

} // namespace grpc_core
6 changes: 6 additions & 0 deletions test/core/end2end/fuzzers/network_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/util/fuzzing_channel_args.h"

namespace grpc_core {

Expand All @@ -27,6 +28,11 @@ Duration ScheduleReads(
grpc_endpoint* mock_endpoint,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine);

Duration ScheduleConnection(
const fuzzer_input::NetworkInput& network_input,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
testing::FuzzingEnvironment environment, int port);

} // namespace grpc_core

#endif // GRPC_TEST_CORE_END2END_FUZZERS_NETWORK_INPUT_H
54 changes: 14 additions & 40 deletions test/core/end2end/fuzzers/server_fuzzer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,23 @@
#include "absl/types/optional.h"

#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>

#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/experiments/config.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/transport/transport.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
#include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
#include "test/core/end2end/fuzzers/fuzzing_common.h"
#include "test/core/end2end/fuzzers/network_input.h"
#include "test/core/util/fuzz_config_vars.h"
#include "test/core/util/fuzzing_channel_args.h"
#include "test/core/util/mock_endpoint.h"

bool squelch = true;
bool leak_check = true;

static void discard_write(grpc_slice /*slice*/) {}

static void dont_log(gpr_log_func_args* /*args*/) {}

namespace grpc_core {
Expand All @@ -55,37 +44,20 @@ class ServerFuzzer final : public BasicFuzzer {
explicit ServerFuzzer(const fuzzer_input::Msg& msg)
: BasicFuzzer(msg.event_engine_actions()) {
ExecCtx exec_ctx;
UpdateMinimumRunTime(
ScheduleReads(msg.network_input(), mock_endpoint_, engine()));
grpc_server_register_completion_queue(server_, cq(), nullptr);
// TODO(ctiller): add more registered methods (one for POST, one for PUT)
grpc_server_register_method(server_, "/reg", nullptr, {}, 0);
auto* creds = grpc_insecure_server_credentials_create();
grpc_server_add_http2_port(server_, "0.0.0.0:1234", creds);
grpc_server_credentials_release(creds);
grpc_server_start(server_);
ChannelArgs channel_args =
CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(
CreateChannelArgsFromFuzzingConfiguration(
msg.channel_args(), FuzzingEnvironment{resource_quota()})
.ToC()
.get());
Transport* transport =
grpc_create_chttp2_transport(channel_args, mock_endpoint_, false);
transport_setup_ok_ =
Server::FromC(server_)
->SetupTransport(transport, nullptr, channel_args, nullptr)
.ok();
if (transport_setup_ok_) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
DestroyServer();
}
UpdateMinimumRunTime(
ScheduleConnection(msg.network_input(), engine(),
FuzzingEnvironment{resource_quota()}, 1234));
}

~ServerFuzzer() { GPR_ASSERT(server_ == nullptr); }

bool transport_setup_ok() const { return transport_setup_ok_; }

private:
Result CreateChannel(
const api_fuzzer::CreateChannel& /* create_channel */) override {
Expand All @@ -104,9 +76,7 @@ class ServerFuzzer final : public BasicFuzzer {
grpc_server* server() override { return server_; }
grpc_channel* channel() override { return nullptr; }

grpc_endpoint* mock_endpoint_ = grpc_mock_endpoint_create(discard_write);
grpc_server* server_ = grpc_server_create(nullptr, nullptr);
bool transport_setup_ok_ = false;
};

} // namespace testing
Expand All @@ -116,9 +86,13 @@ DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) {
if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) {
gpr_set_log_function(dont_log);
}
static const int once = []() {
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_core::ForceEnableExperiment("event_engine_listener", true);
return 42;
}();
GPR_ASSERT(once == 42); // avoid unused variable warning
grpc_core::ApplyFuzzConfigVars(msg.config_vars());
grpc_core::TestOnlyReloadExperimentsFromConfigVariables();
grpc_core::testing::ServerFuzzer server_fuzzer(msg);
if (!server_fuzzer.transport_setup_ok()) return;
server_fuzzer.Run(msg.api_actions());
grpc_core::testing::ServerFuzzer(msg).Run(msg.api_actions());
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ class FuzzingEventEngine : public EventEngine {
// each test.
void UnsetGlobalHooks() ABSL_LOCKS_EXCLUDED(mu_);

Duration max_delay_write() const {
return max_delay_[static_cast<int>(RunType::kWrite)];
}

private:
enum class RunType {
kWrite,
Expand Down

0 comments on commit 19f1b39

Please sign in to comment.