-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generate diskann cache asynchronously. #191
Conversation
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: cqy123456 The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
af5fd9d
to
3b9ac06
Compare
@@ -195,6 +196,7 @@ namespace diskann { | |||
|
|||
std::string disk_index_file; | |||
std::vector<std::pair<_u32, _u32>> node_visit_counter; | |||
_u32 search_counter = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::atomic<_u32>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
thirdparty/DiskANN/include/utils.h
Outdated
@@ -885,7 +885,7 @@ namespace diskann { | |||
return disk_index_filename + "_centroids.bin"; | |||
} | |||
|
|||
inline std::string get_sample_data_filename(const std::string& prefix) { | |||
inline std::string get_sample_data_filename(std::string prefix) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, please keep it as const std::string&
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
node_list.clear(); | ||
node_list.shrink_to_fit(); | ||
node_list.reserve(num_nodes_to_cache); | ||
for (_u64 i = 0; i < num_nodes_to_cache; i++) { | ||
node_list.push_back(this->node_visit_counter[i].first); | ||
} | ||
this->count_visited_nodes = false; | ||
|
||
reinterpret_cast<std::atomic<bool> &>(this->count_visited_nodes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is SUPER dirty, I'd turn count_visited_nodes
into std::atomic<bool>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
@@ -233,6 +235,8 @@ namespace diskann { | |||
// coord_cache | |||
T * coord_cache_buf = nullptr; | |||
tsl::robin_map<_u32, T *> coord_cache; | |||
Semaphore semaph; | |||
bool asyn_generate_cache = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::atomic<bool> asyn_generate_cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
#include <condition_variable> | ||
|
||
namespace diskann { | ||
class Semaphore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use the syntax from https://en.cppreference.com/w/cpp/thread/counting_semaphore, so that we could transition the code to C++20 more easily
@@ -128,6 +127,8 @@ namespace diskann { | |||
|
|||
DISKANN_DLLEXPORT diskann::Metric get_metric() const noexcept; | |||
|
|||
DISKANN_DLLEXPORT void set_asyn_cache_flag(const bool flag); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I personally perfer async
to asyn
, but feel free to keep it as is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
} catch (const std::exception &e) { | ||
LOG_KNOWHERE_ERROR_ << "DiskANN Exception: " << e.what(); | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: incorrect indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
this->node_visit_counter.clear(); | ||
this->node_visit_counter.resize(this->num_points); | ||
reinterpret_cast<std::atomic<bool> &>(this->count_visited_nodes).exchange(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
store is slightly better than exchange as we don't need the old value, after we switch to std::atomic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is atomic read-modify-write more suitable for modifying count_visited_nodes than write atomicity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we don't use the value returned by exchange
, and since both exchange(read-modify-write)
and store(write)
are atomic, there is no difference in terms of effect.
Why is atomic read-modify-write more suitable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think exchange is : lock->read value to register -> write to mem -> unlock; store : lock -> wirte to mem -> unlock. The update of the counter is not strict, and store can be faster, so i update the code.
@@ -195,6 +196,7 @@ namespace diskann { | |||
|
|||
std::string disk_index_file; | |||
std::vector<std::pair<_u32, _u32>> node_visit_counter; | |||
_u32 search_counter = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CMIIW: the warm up method performssample_num
searches and cache the most visited num_nodes_to_cache
nodes. If user requested searches come in during the time, those are also counted towards sample_num
and will affect what node to add to nodes_to_cache
. The searches performed by the warm up method are all in a single thread, but to protect from concurrent access by user searches from other threads, we used several atomic primitives to protect search_counter, node_visit_counter
and others.
I wonder do we have to count user searches during the warm up? If not we can greatly simplify the code. Since the warm up searches are serial we could just count them without using any atomics.
template <warm_up = false>
cached_beam_search(q) {
if constexpr (warm_up) {
// no need to protect node_visit_counter
count_node_visit();
}
}
void generate_cache_list_from_sample_queries() {
// single thread operation:
// perform all `sample_num` searches, not counting user searches
for (q : warm_up_qs) {
cached_beam_search<true>(q);
}
top_nodes = sorted(node_visit_counter)[:num_nodes_to_cache];
load_into_cache(top_nodes);
}
void normal_search(q) {
// multiple threads may enter here at the same time
// user searches don't affect `node_visit_counter`
cached_beam_search(q);
}
The current code is not completely thread safe anyway, when we have finished warm up, before we flip the bit reinterpret_cast<std::atomic<bool> &>(this->count_visited_nodes).exchange(false)
, other threads may continue updating search_counter
and node_visit_counter
when sorting node_visit_counter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it is better to use user queries to generate cache. And sampling vectors use as backup search queries. So I use a counter to record search number. Cache only can be generated after the counter reach a certain number.
8c26982
to
fbf7398
Compare
this->node_visit_counter.clear(); | ||
this->node_visit_counter.resize(this->num_points); | ||
reinterpret_cast<std::atomic<bool> &>(this->count_visited_nodes).exchange(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we don't use the value returned by exchange
, and since both exchange(read-modify-write)
and store(write)
are atomic, there is no difference in terms of effect.
Why is atomic read-modify-write more suitable?
return; | ||
} | ||
|
||
this->count_visited_nodes.exchange(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply moving this line doesn't help guarantee "stop updating this->search_counter
and this->node_visit_counter
once we have reached sample_num
searches".
But since the stop condition this->search_counter.load() == sample_num
is not strict(it's ok to search more than sample_num
times to building the cache), we can keep the code as is. But please add a comment like "it is intentional and ok that search_counter/node_visit_counter may continue being updated after search_counter has reached sample_num before we flip count_visited_nodes".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if count_visited_nodes == true: diskann will update node_visit_counter in each iteration, and update search_counter in each query. So when generate cache task done, i want to release node_visit_counter memory and reduce some search time by updating node_visit_counter and search_counter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is:
In general sync mechanisms are used to guarantee strong correctness, but here we use locks/atomics while tolerating a degree of inaccuracy(as you mentioned in another comment that the update of the counter is not strict). So we should leave a comment explicitly stating such.
fbf7398
to
97da310
Compare
/lgtm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
@@ -332,32 +338,47 @@ namespace diskann { | |||
return; | |||
} | |||
|
|||
std::vector<int64_t> tmp_result_ids_64(sample_num, 0); | |||
std::vector<float> tmp_result_dists(sample_num, 0); | |||
int64_t tmp_result_ids_64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any race condition risks?
Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
97da310
to
fcff381
Compare
New changes are detected. LGTM label has been removed. |
This PR will remove the previous sync cache generate function.
The prepare phase takes a thread from the thread pool to do the following tasks in the background:
If diskann calls the destructor, this background thread will :
I used sift1m(nb = 1M, nq = 10000, dim = 128) for experiments.
Diskann parameters:
The experimental plan as follows:
previous version result :
sync cache generate: 22.35s
search VPS: 5119.53
lastest version result :
aync cache generate: 33.23s
search VPS without cache: 3059
search VPS with cache: 6000.12
The worst case is that there is no input query vector, and the background thread completes the task using the sampling queries alone. And the worst case would take 108s to generate cache.