Skip to content

Commit

Permalink
Geo: geo library support (apache#74)
Browse files Browse the repository at this point in the history
* add geo library, support set/del/search_radial/distance APIs
* integrate the library into redis proxy

Former-commit-id: cb41fabdc0711354c7760c0882598ed981a31374 [formerly 24fec41]
Former-commit-id: d97b5f76ee0dc10974d0edcbf57a0d35c9a3e3b3
  • Loading branch information
acelyc111 authored Jul 11, 2018
1 parent 7894bce commit b3f3584
Show file tree
Hide file tree
Showing 38 changed files with 3,425 additions and 78 deletions.
1 change: 1 addition & 0 deletions scripts/pack_tools.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ mkdir -p ${pack}/DSN_ROOT/lib
copy_file ./DSN_ROOT/lib/*.so* ${pack}/DSN_ROOT/lib/
copy_file ./rdsn/thirdparty/output/lib/libPoco*.so.48 ${pack}/DSN_ROOT/lib/
copy_file ./rdsn/thirdparty/output/lib/libtcmalloc.so.4 ${pack}/DSN_ROOT/lib/
copy_file ./rdsn/thirdparty/output/lib/libs2.so ${pack}/DSN_ROOT/lib/
copy_file `get_boost_lib $custom_boost_lib system` ${pack}/DSN_ROOT/lib/
copy_file `get_boost_lib $custom_boost_lib filesystem` ${pack}/DSN_ROOT/lib/
copy_file `get_stdcpp_lib $custom_gcc` ${pack}/DSN_ROOT/lib/
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../rocksdb/include)

add_subdirectory(base)
add_subdirectory(base/test)
add_subdirectory(client_lib)
add_subdirectory(server)
add_subdirectory(server/test)
add_subdirectory(shell)
add_subdirectory(geo)
add_subdirectory(redis_protocol)
add_subdirectory(test/function_test)
add_subdirectory(test/kill_test)
Expand Down
33 changes: 33 additions & 0 deletions src/base/pegasus_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <time.h>
#include <cctype>
#include <cstring>
#include <queue>
#include <boost/lexical_cast.hpp>
#include <dsn/tool-api/rpc_address.h>
#include <dsn/utility/string_view.h>
Expand Down Expand Up @@ -41,6 +42,37 @@ int binary_compare(const T &a, const T &b)
return r;
}

template <typename elem_type, typename compare = std::less<elem_type>>
class top_n
{
public:
typedef typename std::priority_queue<elem_type, std::vector<elem_type>, compare>
data_priority_queue;

top_n(const std::list<elem_type> &data, int n)
{
for (const auto &r : data) {
_queue.emplace(r);
if (_queue.size() > n) {
_queue.pop();
}
}
}

std::list<elem_type> to()
{
std::list<elem_type> result;
while (!_queue.empty()) {
result.emplace_front(_queue.top());
_queue.pop();
}
return std::move(result);
}

protected:
data_priority_queue _queue;
};

// ----------------------------------------------------------------------
// c_escape_string()
// Copies 'src' to 'dest', escaping dangerous characters using
Expand Down Expand Up @@ -78,3 +110,4 @@ inline rocksdb::Slice to_rocksdb_slice(dsn::string_view s) { return {s.data(), s

} // namespace utils
} // namespace pegasus

20 changes: 20 additions & 0 deletions src/base/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
set(MY_PROJ_NAME base_test)
project(${MY_PROJ_NAME} C CXX)

# Source files under CURRENT project directory will be automatically included.
# You can manually set MY_PROJ_SRC to include source files under other directories.
set(MY_PROJ_SRC "")

# Search mode for source files under CURRENT project directory?
# "GLOB_RECURSE" for recursive search
# "GLOB" for non-recursive search
set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_LIBS
gtest)

set(MY_BOOST_PACKAGES system filesystem)

set(MY_BINPLACES "config.ini")

dsn_add_executable()
77 changes: 77 additions & 0 deletions src/base/test/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
[apps..default]
run = true
count = 1
;network.client.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536
;network.client.RPC_CHANNEL_UDP = dsn::tools::sim_network_provider, 65536
;network.server.0.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536

[apps.mimic]
type = dsn.app.mimic
arguments =
pools = THREAD_POOL_DEFAULT
run = true
count = 1

[core]
;tool = simulator
;tool = fastrun
tool = nativerun
;toollets = tracer
;toollets = tracer, profiler, fault_injector
pause_on_start = false
cli_local = false
cli_remote = false

;aio_factory_name = dsn::tools::native_aio_provider
start_nfs = false

logging_start_level = LOG_LEVEL_DEBUG
logging_factory_name = dsn::tools::simple_logger
;logging_factory_name = dsn::tools::screen_logger
;logging_factory_name = dsn::tools::hpc_logger
logging_flush_on_exit = true

enable_default_app_mimic = true

data_dir = ./data

[tools.simple_logger]
short_header = true
fast_flush = true
max_number_of_log_files_on_disk = 10
stderr_start_level = LOG_LEVEL_ERROR

[tools.hpc_logger]
per_thread_buffer_bytes = 8192
max_number_of_log_files_on_disk = 10

[tools.simulator]
random_seed = 0

[network]
; how many network threads for network library(used by asio)
io_service_worker_count = 4

; specification for each thread pool
[threadpool..default]
worker_count = 4

[threadpool.THREAD_POOL_DEFAULT]
name = default
partitioned = false
max_input_queue_length = 1024
worker_priority = THREAD_xPRIORITY_NORMAL
worker_count = 4

[task..default]
is_trace = false
is_profile = false
allow_inline = false
fast_execution_in_network_thread = false
rpc_call_header_format = NET_HDR_DSN
rpc_call_channel = RPC_CHANNEL_TCP
rpc_timeout_milliseconds = 5000

[uri-resolver.dsn://onebox]
factory = partition_resolver_simple
arguments = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
13 changes: 13 additions & 0 deletions src/base/test/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <dsn/service_api_c.h>
#include <gtest/gtest.h>

GTEST_API_ int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
int ans = RUN_ALL_TESTS();
dsn_exit(ans);
}
51 changes: 51 additions & 0 deletions src/base/test/utils_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include "../pegasus_utils.h"
#include <gtest/gtest.h>

namespace pegasus {
namespace utils {

TEST(utils_test, top_n)
{
{
std::list<int> data({2, 3, 7, 8, 9, 0, 1, 5, 4, 6});
std::list<int> result = top_n<int>(data, 5).to();
ASSERT_EQ(result, std::list<int>({0, 1, 2, 3, 4}));
}

{
std::list<std::string> data({"2", "3", "7", "8", "9", "0", "1", "5", "4", "6"});
std::list<std::string> result = top_n<std::string>(data, 5).to();
ASSERT_EQ(result, std::list<std::string>({"0", "1", "2", "3", "4"}));
}

{
struct longer
{
inline bool operator()(const std::string &l, const std::string &r)
{
return l.length() < r.length();
}
};

std::list<std::string> data({std::string(2, 'a'),
std::string(3, 'a'),
std::string(7, 'a'),
std::string(8, 'a'),
std::string(9, 'a'),
std::string(0, 'a'),
std::string(1, 'a'),
std::string(5, 'a'),
std::string(4, 'a'),
std::string(6, 'a')});
std::list<std::string> result = top_n<std::string>(data, 5).to();
ASSERT_EQ(result,
std::list<std::string>({std::string(0, 'a'),
std::string(1, 'a'),
std::string(2, 'a'),
std::string(3, 'a'),
std::string(4, 'a')}));
}
}

} // namespace utils
} // namespace pegasus
3 changes: 3 additions & 0 deletions src/geo/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
add_subdirectory(lib)
add_subdirectory(test)
add_subdirectory(bench)
31 changes: 31 additions & 0 deletions src/geo/bench/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
set(MY_PROJ_NAME pegasus_geo_bench)
project(${MY_PROJ_NAME} C CXX)

# Source files under CURRENT project directory will be automatically included.
# You can manually set MY_PROJ_SRC to include source files under other directories.
set(MY_PROJ_SRC "")

# Search mode for source files under CURRENT project directory?
# "GLOB_RECURSE" for recursive search
# "GLOB" for non-recursive search
set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_INC_PATH "../../../rocksdb")
set(MY_PROJ_LIB_PATH "../../../rocksdb")

set(MY_PROJ_LIBS
s2
s2testing
pegasus_geo_lib
pegasus_client_static
fmt
rocksdb
z
bz2
snappy)

set(MY_BOOST_PACKAGES system filesystem)

set(MY_BINPLACES "config.ini")

dsn_add_executable()
109 changes: 109 additions & 0 deletions src/geo/bench/bench.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2018-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "geo/lib/geo_client.h"

#include <iostream>
#include <s2/s2testing.h>
#include <s2/s2cell.h>
#include "monitoring/histogram.h"
#include <dsn/utility/strings.h>
#include <rocksdb/env.h>
#include <dsn/utility/string_conv.h>
#include <dsn/cpp/clientlet.h>

static const int data_count = 10000;

int main(int argc, char **argv)
{
if (argc != 7) {
std::cerr << "USAGE: " << argv[0]
<< " <cluster_name> <app_name> <geo_app_name> <radius> <test_count> <max_level>"
<< std::endl;
return -1;
}

std::string cluster_name = argv[1];
std::string app_name = argv[2];
std::string geo_app_name = argv[3];
double radius = 0.0;
if (!dsn::buf2double(argv[4], radius)) {
std::cerr << "radius is invalid: " << argv[4] << std::endl;
return -1;
}
int test_count = 2000;
if (!dsn::buf2int32(argv[5], test_count)) {
std::cerr << "test_count is invalid: " << argv[5] << std::endl;
return -1;
}
int max_level = 16;
if (!dsn::buf2int32(argv[6], max_level)) {
std::cerr << "max_level is invalid: " << argv[6] << std::endl;
return -1;
}
pegasus::geo::geo_client my_geo("config.ini",
cluster_name.c_str(),
app_name.c_str(),
geo_app_name.c_str(),
new pegasus::geo::latlng_extractor_for_lbs());
my_geo.set_max_level(max_level);

// cover beijing 5th ring road
S2LatLngRect rect(S2LatLng::FromDegrees(39.810151, 116.194511),
S2LatLng::FromDegrees(40.028697, 116.535087));

// generate data for test
// for (int i = 0; i < data_count; ++i) {
// S2LatLng latlng(S2Testing::SamplePoint(rect));
// std::string id = std::to_string(i);
// std::string value = id + "|2018-06-05 12:00:00|2018-06-05 13:00:00|abcdefg|" +
// std::to_string(latlng.lng().degrees()) + "|" +
// std::to_string(latlng.lat().degrees()) + "|123.456|456.789|0|-1";
//
// int ret = my_geo.set(id, "", value, 1000);
// if (ret != pegasus::PERR_OK) {
// std::cerr << "set data failed. error=" << ret << std::endl;
// }
// }

rocksdb::HistogramImpl latency_histogram;
rocksdb::HistogramImpl result_count_histogram;
rocksdb::Env *env = rocksdb::Env::Default();
uint64_t start = env->NowNanos();
std::atomic<uint64_t> count(test_count);
dsn::utils::notify_event get_completed;
// test search_radial by lat & lng
for (int i = 0; i < test_count; ++i) {
S2LatLng latlng(S2Testing::SamplePoint(rect));

uint64_t start_nanos = env->NowNanos();
my_geo.async_search_radial(
latlng.lat().degrees(),
latlng.lng().degrees(),
radius,
-1,
pegasus::geo::geo_client::SortType::random,
500,
[&, start_nanos](int error_code, std::list<pegasus::geo::SearchResult> &&results) {
latency_histogram.Add(env->NowNanos() - start_nanos);
result_count_histogram.Add(results.size());
uint64_t left = count.fetch_sub(1);
if (left == 1) {
get_completed.notify();
}
});
}
std::cout << "get_completed.wait" << std::endl;
get_completed.wait();
uint64_t end = env->NowNanos();

std::cout << "start time: " << start << ", end time: " << end
<< ", QPS: " << test_count / ((end - start) / 1e9) << std::endl;
std::cout << "latency_histogram: " << std::endl;
std::cout << latency_histogram.ToString() << std::endl;
std::cout << "result_count_histogram: " << std::endl;
std::cout << result_count_histogram.ToString() << std::endl;

return 0;
}
Loading

0 comments on commit b3f3584

Please sign in to comment.