Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions src/index/diskann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,14 @@ DiskANN::DiskANN(DiskannParameters& diskann_params, const IndexCommonParam& inde

this->feature_list_ = std::make_shared<IndexFeatureList>();
this->init_feature_list();
result_queues_.try_emplace(STATSTIC_KNN_IO);
result_queues_.try_emplace(STATSTIC_KNN_TIME);
result_queues_.try_emplace(STATSTIC_KNN_IO_TIME);
result_queues_.try_emplace(STATSTIC_RANGE_IO);
result_queues_.try_emplace(STATSTIC_RANGE_HOP);
result_queues_.try_emplace(STATSTIC_RANGE_TIME);
result_queues_.try_emplace(STATSTIC_RANGE_CACHE_HIT);
result_queues_.try_emplace(STATSTIC_RANGE_IO_TIME);
}

tl::expected<std::vector<int64_t>, Error>
Expand Down Expand Up @@ -517,13 +525,14 @@ DiskANN::knn_search(const DatasetPtr& query,
query_stats.data() + i);
}
}
{
std::lock_guard<std::mutex> lock(stats_mutex_);
result_queues_[STATSTIC_KNN_IO].Push(static_cast<float>(query_stats[i].n_ios));
result_queues_[STATSTIC_KNN_TIME].Push(static_cast<float>(time_cost));
result_queues_[STATSTIC_KNN_IO_TIME].Push(
(query_stats[i].io_us / static_cast<float>(query_stats[i].n_ios)) /
MACRO_TO_MILLI);
result_queues_.at(STATSTIC_KNN_IO).Push(static_cast<float>(query_stats[i].n_ios));
result_queues_.at(STATSTIC_KNN_TIME).Push(static_cast<float>(time_cost));
auto& io_time_queue = result_queues_.at(STATSTIC_KNN_IO_TIME);
if (query_stats[i].n_ios > 0) {
io_time_queue.Push((query_stats[i].io_us / static_cast<float>(query_stats[i].n_ios)) /
MACRO_TO_MILLI);
} else {
io_time_queue.Push(0.0F);
}

} catch (const std::runtime_error& e) {
Expand Down Expand Up @@ -647,16 +656,17 @@ DiskANN::range_search(const DatasetPtr& query,
params.use_async_io,
&query_stats);
}
{
std::lock_guard<std::mutex> lock(stats_mutex_);

result_queues_[STATSTIC_RANGE_IO].Push(static_cast<float>(query_stats.n_ios));
result_queues_[STATSTIC_RANGE_HOP].Push(static_cast<float>(query_stats.n_hops));
result_queues_[STATSTIC_RANGE_TIME].Push(static_cast<float>(time_cost));
result_queues_[STATSTIC_RANGE_CACHE_HIT].Push(
static_cast<float>(query_stats.n_cache_hits));
result_queues_[STATSTIC_RANGE_IO_TIME].Push(
(query_stats.io_us / static_cast<float>(query_stats.n_ios)) / MACRO_TO_MILLI);
result_queues_.at(STATSTIC_RANGE_IO).Push(static_cast<float>(query_stats.n_ios));
result_queues_.at(STATSTIC_RANGE_HOP).Push(static_cast<float>(query_stats.n_hops));
result_queues_.at(STATSTIC_RANGE_TIME).Push(static_cast<float>(time_cost));
result_queues_.at(STATSTIC_RANGE_CACHE_HIT)
.Push(static_cast<float>(query_stats.n_cache_hits));
auto& range_io_time_queue = result_queues_.at(STATSTIC_RANGE_IO_TIME);
if (query_stats.n_ios > 0) {
range_io_time_queue.Push((query_stats.io_us / static_cast<float>(query_stats.n_ios)) /
MACRO_TO_MILLI);
} else {
range_io_time_queue.Push(0.0F);
}
} catch (const std::runtime_error& e) {
LOG_ERROR_AND_RETURNS(
Expand Down Expand Up @@ -1082,11 +1092,8 @@ DiskANN::GetStats() const {
j[STATSTIC_INDEX_NAME].SetString(INDEX_DISKANN);
j[STATSTIC_MEMORY].SetInt(GetMemoryUsage());

{
std::lock_guard<std::mutex> lock(stats_mutex_);
for (auto& item : result_queues_) {
j[item.first].SetFloat(item.second.GetAvgResult());
}
for (auto& item : result_queues_) {
j[item.first].SetFloat(item.second.GetAvgResult());
}

return j.Dump(4);
Expand Down
14 changes: 3 additions & 11 deletions src/index/hnsw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ HNSW::HNSW(HnswParameters hnsw_params, const IndexCommonParam& index_common_para
true);

this->init_feature_list();
result_queues_.try_emplace(STATSTIC_KNN_TIME);
}

tl::expected<std::vector<int64_t>, Error>
Expand Down Expand Up @@ -300,11 +301,7 @@ HNSW::knn_search(const DatasetPtr& query,
e.what());
}

// update stats
{
std::lock_guard<std::mutex> lock(stats_mutex_);
result_queues_[STATSTIC_KNN_TIME].Push(static_cast<float>(time_cost));
}
result_queues_.at(STATSTIC_KNN_TIME).Push(static_cast<float>(time_cost));

// return result
if (results.empty()) {
Expand Down Expand Up @@ -427,11 +424,7 @@ HNSW::range_search(const DatasetPtr& query,
e.what());
}

// update stats
{
std::lock_guard<std::mutex> lock(stats_mutex_);
result_queues_[STATSTIC_KNN_TIME].Push(static_cast<float>(time_cost));
}
result_queues_.at(STATSTIC_KNN_TIME).Push(static_cast<float>(time_cost));

// return result
auto target_size = static_cast<int64_t>(results.size());
Expand Down Expand Up @@ -716,7 +709,6 @@ HNSW::GetStats() const {
j[STATSTIC_MEMORY].SetInt(GetMemoryUsage());

{
std::lock_guard<std::mutex> lock(stats_mutex_);
for (auto& item : result_queues_) {
j[item.first].SetFloat(item.second.GetAvgResult());
}
Expand Down
10 changes: 7 additions & 3 deletions src/utils/window_result_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ WindowResultQueue::WindowResultQueue() {

void
WindowResultQueue::Push(float value) {
std::lock_guard<std::mutex> lock(queue_mutex_);
uint64_t pos = count_.load(std::memory_order_relaxed);
uint64_t window_size = queue_.size();
queue_[count_ % window_size] = value;
count_++;
queue_[pos % window_size] = value;
count_.fetch_add(1, std::memory_order_relaxed);
}
Comment on lines 27 to 33

float
WindowResultQueue::GetAvgResult() const {
uint64_t statistic_num = std::min<uint64_t>(count_, queue_.size());
std::lock_guard<std::mutex> lock(queue_mutex_);
uint64_t statistic_num =
std::min<uint64_t>(count_.load(std::memory_order_relaxed), queue_.size());
Comment on lines +38 to +39
Comment on lines +38 to +39
Comment on lines +38 to +39
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statistic_num is computed before acquiring queue_mutex_, so it can be derived from a count_ value that doesn’t correspond to the buffer state protected by the mutex (especially given Push() currently increments count_ before writing). Compute statistic_num while holding queue_mutex_ (and, if keeping a separate published counter, base it on that) so the “how many samples are valid” decision is consistent with the snapshot being summed.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit db15fa2. statistic_num is now calculated while holding queue_mutex_ to ensure consistency with the buffer state.

if (statistic_num == 0) {
return 0.0F;
}
Expand Down
5 changes: 4 additions & 1 deletion src/utils/window_result_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

#pragma once

#include <atomic>
#include <mutex>
Comment on lines +18 to +19
#include <string>
#include <vector>

Expand All @@ -30,7 +32,8 @@ class WindowResultQueue {
GetAvgResult() const;

private:
uint64_t count_ = 0;
std::atomic<uint64_t> count_{0};
std::vector<float> queue_;
mutable std::mutex queue_mutex_;
Comment on lines +35 to +37
};
} // namespace vsag
Loading