Skip to content

Commit

Permalink
make braft support hostname/dns.
Browse files Browse the repository at this point in the history
  * add more constructor for PeerId.
  * modify PeerId operator<.
  * use brpc naming_service_url init channel API to force resolv dns.
  * modify raft_meta log saved format when PeerId type is HostName.
  * init channels only once and save them for future use
  • Loading branch information
xiaolin310 committed Aug 28, 2023
1 parent 59c40e5 commit cfa1256
Show file tree
Hide file tree
Showing 15 changed files with 1,209 additions and 74 deletions.
134 changes: 134 additions & 0 deletions example/counter/counter_hostname_test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
cmake_minimum_required(VERSION 2.8.10)
project(counter C CXX)

option(EXAMPLE_LINK_SO "Whether examples are linked dynamically" OFF)
option(LINK_TCMALLOC "Link tcmalloc if possible" ON)

execute_process(
COMMAND bash -c "find ${CMAKE_SOURCE_DIR}/../.. -type d -path \"*output/include/braft\" | xargs dirname | xargs dirname | tr -d '\n'"
OUTPUT_VARIABLE OUTPUT_PATH
)

set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})

include(FindThreads)
include(FindProtobuf)

if (NOT PROTOBUF_PROTOC_EXECUTABLE)
get_filename_component(PROTO_LIB_DIR ${PROTOBUF_LIBRARY} DIRECTORY)
set (PROTOBUF_PROTOC_EXECUTABLE "${PROTO_LIB_DIR}/../bin/protoc")
endif()

protobuf_generate_cpp(PROTO_SRC PROTO_HEADER counter.proto)
# include PROTO_HEADER
include_directories(${CMAKE_CURRENT_BINARY_DIR})

find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
if(EXAMPLE_LINK_SO)
find_library(BRPC_LIB NAMES brpc)
find_library(BRAFT_LIB NAMES braft)
else()
find_library(BRPC_LIB NAMES libbrpc.a brpc)
find_library(BRAFT_LIB NAMES libbraft.a braft)
endif()

if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
message(FATAL_ERROR "Fail to find brpc")
endif()
include_directories(${BRPC_INCLUDE_PATH})

find_path(BRAFT_INCLUDE_PATH NAMES braft/raft.h)
if ((NOT BRAFT_INCLUDE_PATH) OR (NOT BRAFT_LIB))
message (FATAL_ERROR "Fail to find braft")
endif()
include_directories(${BRAFT_INCLUDE_PATH})

find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
message(FATAL_ERROR "Fail to find gflags")
endif()
include_directories(${GFLAGS_INCLUDE_PATH})

execute_process(
COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'"
OUTPUT_VARIABLE GFLAGS_NS
)
if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
execute_process(
COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'"
OUTPUT_VARIABLE GFLAGS_NS
)
endif()

if (LINK_TCMALLOC)
find_path(GPERFTOOLS_INCLUDE_DIR NAMES gperftools/heap-profiler.h)
find_library(GPERFTOOLS_LIBRARIES NAMES tcmalloc_and_profiler)
if (GPERFTOOLS_INCLUDE_DIR AND GPERFTOOLS_LIBRARIES)
set(CMAKE_CXX_FLAGS "-DBRPC_ENABLE_CPU_PROFILER")
include_directories(${GPERFTOOLS_INCLUDE_DIR})
else ()
set (GPERFTOOLS_LIBRARIES "")
endif ()
endif ()

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CPP_FLAGS} -DGFLAGS_NS=${GFLAGS_NS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
# require at least gcc 4.8
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.8)
message(FATAL_ERROR "GCC is too old, please install a newer version supporting C++11")
endif()
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
# require at least clang 3.3
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 3.3)
message(FATAL_ERROR "Clang is too old, please install a newer version supporting C++11")
endif()
else()
message(WARNING "You are using an unsupported compiler! Compilation has only been tested with Clang and GCC.")
endif()

if(CMAKE_VERSION VERSION_LESS "3.1.3")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
else()
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif()

find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
find_library(LEVELDB_LIB NAMES leveldb)
if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
message(FATAL_ERROR "Fail to find leveldb")
endif()
include_directories(${LEVELDB_INCLUDE_PATH})

add_executable(counter_client client.cpp ${PROTO_SRC} ${PROTO_HEADER})
add_executable(counter_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})

set(DYNAMIC_LIB
${CMAKE_THREAD_LIBS_INIT}
${GFLAGS_LIBRARY}
${PROTOBUF_LIBRARY}
${GPERFTOOLS_LIBRARIES}
${LEVELDB_LIB}
${BRAFT_LIB}
${BRPC_LIB}
rt
ssl
crypto
dl
z
)

target_link_libraries(counter_client
"-Xlinker \"-(\""
${DYNAMIC_LIB}
"-Xlinker \"-)\"")
target_link_libraries(counter_server
"-Xlinker \"-(\""
${DYNAMIC_LIB}
"-Xlinker \"-)\"")
163 changes: 163 additions & 0 deletions example/counter/counter_hostname_test/client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <brpc/channel.h>
#include <brpc/controller.h>
#include <braft/raft.h>
#include <braft/util.h>
#include <braft/route_table.h>
#include "counter.pb.h"

DEFINE_bool(log_each_request, false, "Print log for each request");
DEFINE_bool(use_bthread, false, "Use bthread to send requests");
DEFINE_int32(add_percentage, 100, "Percentage of fetch_add");
DEFINE_int64(added_by, 1, "Num added to each peer");
DEFINE_int32(thread_num, 1, "Number of threads sending requests");
DEFINE_int32(timeout_ms, 1000, "Timeout for each request");
DEFINE_string(conf, "", "Configuration of the raft group");
DEFINE_string(group, "Counter", "Id of the replication group");

bvar::LatencyRecorder g_latency_recorder("counter_client");

static void* sender(void* arg) {
while (!brpc::IsAskedToQuit()) {
braft::PeerId leader;
// Select leader of the target group from RouteTable
if (braft::rtb::select_leader(FLAGS_group, &leader) != 0) {
// Leader is unknown in RouteTable. Ask RouteTable to refresh leader
// by sending RPCs.
butil::Status st = braft::rtb::refresh_leader(
FLAGS_group, FLAGS_timeout_ms);
if (!st.ok()) {
// Not sure about the leader, sleep for a while and the ask again.
LOG(WARNING) << "Fail to refresh_leader : " << st;
bthread_usleep(FLAGS_timeout_ms * 1000L);
}
continue;
}

// Now we known who is the leader, construct Stub and then sending
// rpc
brpc::Channel channel;
if (leader.type_ == braft::PeerId::Type::EndPoint) {
if (channel.Init(leader.addr, NULL) != 0) {
LOG(ERROR) << "Fail to init channel to " << leader;
bthread_usleep(FLAGS_timeout_ms * 1000L);
continue;
}
} else {
std::string naming_service_url;
naming_service_url.append(PROTOCOL_PREFIX);
naming_service_url.append(leader.hostname_);
if (channel.Init(naming_service_url.c_str(), LOAD_BALANCER_NAME, NULL) != 0) {
LOG(ERROR) << "Fail to init channel to " << leader;
bthread_usleep(FLAGS_timeout_ms * 1000L);
continue;
}

}
example::CounterService_Stub stub(&channel);

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_timeout_ms);
// Randomly select which request we want send;
example::CounterResponse response;

if (butil::fast_rand_less_than(100) < (size_t)FLAGS_add_percentage) {
example::FetchAddRequest request;
request.set_value(FLAGS_added_by);
stub.fetch_add(&cntl, &request, &response, NULL);
} else {
example::GetRequest request;
stub.get(&cntl, &request, &response, NULL);
}
if (cntl.Failed()) {
LOG(WARNING) << "Fail to send request to " << leader
<< " : " << cntl.ErrorText();
// Clear leadership since this RPC failed.
braft::rtb::update_leader(FLAGS_group, braft::PeerId());
bthread_usleep(FLAGS_timeout_ms * 1000L);
continue;
}
if (!response.success()) {
LOG(WARNING) << "Fail to send request to " << leader
<< ", redirecting to "
<< (response.has_redirect()
? response.redirect() : "nowhere");
// Update route table since we have redirect information
braft::rtb::update_leader(FLAGS_group, response.redirect());
continue;
}
g_latency_recorder << cntl.latency_us();
if (FLAGS_log_each_request) {
LOG(INFO) << "Received response from " << leader
<< " value=" << response.value()
<< " latency=" << cntl.latency_us();
bthread_usleep(1000L * 1000L);
}
}
return NULL;
}

int main(int argc, char* argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
butil::AtExitManager exit_manager;

// Register configuration of target group to RouteTable
if (braft::rtb::update_configuration(FLAGS_group, FLAGS_conf) != 0) {
LOG(ERROR) << "Fail to register configuration " << FLAGS_conf
<< " of group " << FLAGS_group;
return -1;
}

std::vector<bthread_t> tids;
tids.resize(FLAGS_thread_num);
if (!FLAGS_use_bthread) {
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (pthread_create(&tids[i], NULL, sender, NULL) != 0) {
LOG(ERROR) << "Fail to create pthread";
return -1;
}
}
} else {
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (bthread_start_background(&tids[i], NULL, sender, NULL) != 0) {
LOG(ERROR) << "Fail to create bthread";
return -1;
}
}
}

while (!brpc::IsAskedToQuit()) {
sleep(1);
LOG_IF(INFO, !FLAGS_log_each_request)
<< "Sending Request to " << FLAGS_group
<< " (" << FLAGS_conf << ')'
<< " at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
}

LOG(INFO) << "Counter client is going to quit";
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (!FLAGS_use_bthread) {
pthread_join(tids[i], NULL);
} else {
bthread_join(tids[i], NULL);
}
}

return 0;
}
25 changes: 25 additions & 0 deletions example/counter/counter_hostname_test/counter.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax="proto2";
package example;
option cc_generic_services = true;

message Snapshot {
required int64 value = 1;
};

message FetchAddRequest {
required int64 value = 1;
};

message CounterResponse {
required bool success = 1;
optional int64 value = 2;
optional string redirect = 3;
};

message GetRequest {
};

service CounterService {
rpc fetch_add(FetchAddRequest) returns (CounterResponse);
rpc get(GetRequest) returns (CounterResponse);
};
60 changes: 60 additions & 0 deletions example/counter/counter_hostname_test/run_client.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/bash

# Copyright (c) 2018 Baidu.com, Inc. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# source shflags from current directory
mydir="${BASH_SOURCE%/*}"
if [[ ! -d "$mydir" ]]; then mydir="$PWD"; fi
. $mydir/../shflags


# define command-line flags
DEFINE_boolean clean 1 'Remove old "runtime" dir before running'
DEFINE_integer add_percentage 100 'Percentage of fetch_add operation'
DEFINE_integer bthread_concurrency '8' 'Number of worker pthreads'
DEFINE_integer server_port 8100 "Port of the first server"
DEFINE_integer server_num '3' 'Number of servers'
DEFINE_integer thread_num 3 'Number of sending thread'
DEFINE_string crash_on_fatal 'true' 'Crash on fatal log'
DEFINE_string log_each_request 'true' 'Print log for each request'
DEFINE_string valgrind 'false' 'Run in valgrind'
DEFINE_string use_bthread "true" "Use bthread to send request"

FLAGS "$@" || exit 1

# hostname prefers ipv6
# IP=`hostname -i | awk '{print $NF}'`
IP=`hostname`

if [ "$FLAGS_valgrind" == "true" ] && [ $(which valgrind) ] ; then
VALGRIND="valgrind --tool=memcheck --leak-check=full"
fi

raft_peers=""
for ((i=0; i<$FLAGS_server_num; ++i)); do
raft_peers="${raft_peers}${IP}:$((${FLAGS_server_port}+i)):0,"
done

export TCMALLOC_SAMPLE_PARAMETER=524288

${VALGRIND} ./counter_client \
--add_percentage=${FLAGS_add_percentage} \
--bthread_concurrency=${FLAGS_bthread_concurrency} \
--conf="${raft_peers}" \
--crash_on_fatal_log=${FLAGS_crash_on_fatal} \
--log_each_request=${FLAGS_log_each_request} \
--thread_num=${FLAGS_thread_num} \
--use_bthread=${FLAGS_use_bthread} \

Loading

0 comments on commit cfa1256

Please sign in to comment.